Support any inner join on a reference table (#3323)

This PR works by doing two things:
1. Expand the notion of a join condition to any expression that contains
   columns from two or more tables.
2. Support cartesian products on reference tables.

Cartesian products on reference tables are considered in the join order planner
as the least desirable join (except for normal cartesian products). That way
they will be done at the end of the join. This is preferable since the
cartesian product multiplies the rows. By doing it at the end at least these
multiplications of rows will not be sent over the network when doing
repartitioning, only when sending to the master.

Fixes #3079
Fixes #3198
pull/3346/head
Jelte Fennema 2019-12-27 15:14:50 +01:00 committed by GitHub
commit 0cd5d6ac49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 745 additions and 177 deletions

View File

@ -76,6 +76,10 @@ static RuleEvalFunction JoinRuleEvalFunction(JoinRuleType ruleType);
static char * JoinRuleName(JoinRuleType ruleType);
static JoinOrderNode * ReferenceJoin(JoinOrderNode *joinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType);
static JoinOrderNode * CartesianProductReferenceJoin(JoinOrderNode *joinNode,
TableEntry *candidateTable,
List *applicableJoinClauses,
JoinType joinType);
static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType);
static bool JoinOnColumns(Var *currentPartitioncolumn, Var *candidatePartitionColumn,
@ -212,7 +216,6 @@ ExtractLeftMostRangeTableIndex(Node *node, int *rangeTableIndex)
static bool
JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList)
{
ListCell *joinClauseCell = NULL;
if (currentColumn == NULL || candidateColumn == NULL)
{
/*
@ -222,11 +225,16 @@ JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList)
return false;
}
foreach(joinClauseCell, joinClauseList)
Node *joinClause = NULL;
foreach_ptr(joinClause, joinClauseList)
{
OpExpr *joinClause = castNode(OpExpr, lfirst(joinClauseCell));
Var *leftColumn = LeftColumnOrNULL(joinClause);
Var *rightColumn = RightColumnOrNULL(joinClause);
if (!NodeIsEqualsOpExpr(joinClause))
{
continue;
}
OpExpr *joinClauseOpExpr = castNode(OpExpr, joinClause);
Var *leftColumn = LeftColumnOrNULL(joinClauseOpExpr);
Var *rightColumn = RightColumnOrNULL(joinClauseOpExpr);
/*
* Check if both join columns and both partition key columns match, since the
@ -249,6 +257,22 @@ JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList)
}
/*
* NodeIsEqualsOpExpr checks if the node is an OpExpr, where the operator
* matches OperatorImplementsEquality.
*/
bool
NodeIsEqualsOpExpr(Node *node)
{
if (!IsA(node, OpExpr))
{
return false;
}
OpExpr *opExpr = castNode(OpExpr, node);
return OperatorImplementsEquality(opExpr->opno);
}
/*
* JoinOrderList calculates the best join order and join rules that apply given
* the list of tables and join clauses. First, the function generates a set of
@ -732,6 +756,8 @@ JoinRuleEvalFunction(JoinRuleType ruleType)
RuleEvalFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = &SinglePartitionJoin;
RuleEvalFunctionArray[SINGLE_HASH_PARTITION_JOIN] = &SinglePartitionJoin;
RuleEvalFunctionArray[DUAL_PARTITION_JOIN] = &DualPartitionJoin;
RuleEvalFunctionArray[CARTESIAN_PRODUCT_REFERENCE_JOIN] =
&CartesianProductReferenceJoin;
RuleEvalFunctionArray[CARTESIAN_PRODUCT] = &CartesianProduct;
ruleEvalFunctionsInitialized = true;
@ -760,6 +786,8 @@ JoinRuleName(JoinRuleType ruleType)
RuleNameArray[SINGLE_RANGE_PARTITION_JOIN] =
strdup("single range partition join");
RuleNameArray[DUAL_PARTITION_JOIN] = strdup("dual partition join");
RuleNameArray[CARTESIAN_PRODUCT_REFERENCE_JOIN] = strdup(
"cartesian product reference join");
RuleNameArray[CARTESIAN_PRODUCT] = strdup("cartesian product");
ruleNamesInitialized = true;
@ -781,48 +809,76 @@ static JoinOrderNode *
ReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType)
{
JoinOrderNode *nextJoinNode = NULL;
int applicableJoinCount = list_length(applicableJoinClauses);
char candidatePartitionMethod = PartitionMethod(candidateTable->relationId);
char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId);
bool performReferenceJoin = false;
if (applicableJoinCount <= 0)
{
return NULL;
}
/*
* If the table is a reference table, then the reference join is feasible.It
* is valid only for inner joins.
*
* Right join requires existing (left) table to be reference table, full outer
* join requires both tables to be reference tables.
*/
char candidatePartitionMethod = PartitionMethod(candidateTable->relationId);
char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId);
if (!IsSupportedReferenceJoin(joinType,
leftPartitionMethod == DISTRIBUTE_BY_NONE,
candidatePartitionMethod == DISTRIBUTE_BY_NONE))
{
return NULL;
}
return MakeJoinOrderNode(candidateTable, REFERENCE_JOIN,
currentJoinNode->partitionColumn,
currentJoinNode->partitionMethod,
currentJoinNode->anchorTable);
}
/*
* IsSupportedReferenceJoin checks if with this join type we can safely do a simple join
* on the reference table on all the workers.
*/
bool
IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable,
bool rightIsReferenceTable)
{
if ((joinType == JOIN_INNER || joinType == JOIN_LEFT || joinType == JOIN_ANTI) &&
candidatePartitionMethod == DISTRIBUTE_BY_NONE)
rightIsReferenceTable)
{
performReferenceJoin = true;
return true;
}
else if (joinType == JOIN_RIGHT && leftPartitionMethod == DISTRIBUTE_BY_NONE)
else if ((joinType == JOIN_RIGHT) &&
leftIsReferenceTable)
{
performReferenceJoin = true;
return true;
}
else if (joinType == JOIN_FULL && leftPartitionMethod == DISTRIBUTE_BY_NONE &&
candidatePartitionMethod == DISTRIBUTE_BY_NONE)
else if (joinType == JOIN_FULL && leftIsReferenceTable && rightIsReferenceTable)
{
performReferenceJoin = true;
return true;
}
return false;
}
if (performReferenceJoin)
{
nextJoinNode = MakeJoinOrderNode(candidateTable, REFERENCE_JOIN,
currentJoinNode->partitionColumn,
currentJoinNode->partitionMethod,
currentJoinNode->anchorTable);
}
return nextJoinNode;
/*
* ReferenceJoin evaluates if the candidate table is a reference table for inner,
* left and anti join. For right join, current join node must be represented by
* a reference table. For full join, both of them must be a reference table.
*/
static JoinOrderNode *
CartesianProductReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType)
{
char candidatePartitionMethod = PartitionMethod(candidateTable->relationId);
char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId);
if (!IsSupportedReferenceJoin(joinType,
leftPartitionMethod == DISTRIBUTE_BY_NONE,
candidatePartitionMethod == DISTRIBUTE_BY_NONE))
{
return NULL;
}
return MakeJoinOrderNode(candidateTable, CARTESIAN_PRODUCT_REFERENCE_JOIN,
currentJoinNode->partitionColumn,
currentJoinNode->partitionMethod,
currentJoinNode->anchorTable);
}
@ -1006,17 +1062,21 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
OpExpr *
SinglePartitionJoinClause(Var *partitionColumn, List *applicableJoinClauses)
{
ListCell *applicableJoinClauseCell = NULL;
if (partitionColumn == NULL)
{
return NULL;
}
foreach(applicableJoinClauseCell, applicableJoinClauses)
Node *applicableJoinClause = NULL;
foreach_ptr(applicableJoinClause, applicableJoinClauses)
{
OpExpr *applicableJoinClause = castNode(OpExpr, lfirst(applicableJoinClauseCell));
Var *leftColumn = LeftColumnOrNULL(applicableJoinClause);
Var *rightColumn = RightColumnOrNULL(applicableJoinClause);
if (!NodeIsEqualsOpExpr(applicableJoinClause))
{
continue;
}
OpExpr *applicableJoinOpExpr = castNode(OpExpr, applicableJoinClause);
Var *leftColumn = LeftColumnOrNULL(applicableJoinOpExpr);
Var *rightColumn = RightColumnOrNULL(applicableJoinOpExpr);
if (leftColumn == NULL || rightColumn == NULL)
{
/* not a simple partition column join */
@ -1034,7 +1094,7 @@ SinglePartitionJoinClause(Var *partitionColumn, List *applicableJoinClauses)
{
if (leftColumn->vartype == rightColumn->vartype)
{
return applicableJoinClause;
return applicableJoinOpExpr;
}
else
{
@ -1081,13 +1141,16 @@ DualPartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
OpExpr *
DualPartitionJoinClause(List *applicableJoinClauses)
{
ListCell *applicableJoinClauseCell = NULL;
foreach(applicableJoinClauseCell, applicableJoinClauses)
Node *applicableJoinClause = NULL;
foreach_ptr(applicableJoinClause, applicableJoinClauses)
{
OpExpr *applicableJoinClause = (OpExpr *) lfirst(applicableJoinClauseCell);
Var *leftColumn = LeftColumnOrNULL(applicableJoinClause);
Var *rightColumn = RightColumnOrNULL(applicableJoinClause);
if (!NodeIsEqualsOpExpr(applicableJoinClause))
{
continue;
}
OpExpr *applicableJoinOpExpr = castNode(OpExpr, applicableJoinClause);
Var *leftColumn = LeftColumnOrNULL(applicableJoinOpExpr);
Var *rightColumn = RightColumnOrNULL(applicableJoinOpExpr);
if (leftColumn == NULL || rightColumn == NULL)
{
@ -1097,7 +1160,7 @@ DualPartitionJoinClause(List *applicableJoinClauses)
/* we only need to check that the join column types match */
if (leftColumn->vartype == rightColumn->vartype)
{
return applicableJoinClause;
return applicableJoinOpExpr;
}
else
{
@ -1158,9 +1221,9 @@ MakeJoinOrderNode(TableEntry *tableEntry, JoinRuleType joinRuleType,
* in either the list of tables on the left *or* in the right hand table.
*/
bool
IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, OpExpr *joinClause)
IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, Node *joinClause)
{
List *varList = pull_var_clause_default((Node *) joinClause);
List *varList = pull_var_clause_default(joinClause);
Var *var = NULL;
bool joinContainsRightTable = false;
foreach_ptr(var, varList)
@ -1196,15 +1259,14 @@ IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, OpExpr *joinC
List *
ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId, List *joinClauseList)
{
ListCell *joinClauseCell = NULL;
List *applicableJoinClauses = NIL;
/* make sure joinClauseList contains only join clauses */
joinClauseList = JoinClauseList(joinClauseList);
foreach(joinClauseCell, joinClauseList)
Node *joinClause = NULL;
foreach_ptr(joinClause, joinClauseList)
{
OpExpr *joinClause = castNode(OpExpr, lfirst(joinClauseCell));
if (IsApplicableJoinClause(leftTableIdList, rightTableId, joinClause))
{
applicableJoinClauses = lappend(applicableJoinClauses, joinClause);

View File

@ -115,6 +115,11 @@ static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *righ
static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *joinClauses);
static MultiNode * ApplyCartesianProductReferenceJoin(MultiNode *leftNode,
MultiNode *rightNode,
Var *partitionColumn,
JoinType joinType,
List *joinClauses);
static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *joinClauses);
@ -1429,27 +1434,6 @@ IsJoinClause(Node *clause)
{
Var *var = NULL;
if (!IsA(clause, OpExpr))
{
return false;
}
OpExpr *operatorExpression = castNode(OpExpr, clause);
bool equalsOperator = OperatorImplementsEquality(operatorExpression->opno);
if (!equalsOperator)
{
/*
* The single and dual repartition join and local join planners expect the clauses
* to be equi-join to calculate a hash on which to distribute.
*
* In the future we should move this clause to those planners and allow
* non-equi-join's in the reference join and cartesian product. This is tracked in
* https://github.com/citusdata/citus/issues/3198
*/
return false;
}
/*
* take all column references from the clause, if we find 2 column references from a
* different relation we assume this is a join clause
@ -1705,7 +1689,7 @@ MultiSelectNode(List *whereClauseList)
foreach(whereClauseCell, whereClauseList)
{
Node *whereClause = (Node *) lfirst(whereClauseCell);
if (IsSelectClause(whereClause) || or_clause(whereClause))
if (IsSelectClause(whereClause))
{
selectClauseList = lappend(selectClauseList, whereClause);
}
@ -2043,6 +2027,8 @@ JoinRuleApplyFunction(JoinRuleType ruleType)
RuleApplyFunctionArray[SINGLE_RANGE_PARTITION_JOIN] =
&ApplySingleRangePartitionJoin;
RuleApplyFunctionArray[DUAL_PARTITION_JOIN] = &ApplyDualPartitionJoin;
RuleApplyFunctionArray[CARTESIAN_PRODUCT_REFERENCE_JOIN] =
&ApplyCartesianProductReferenceJoin;
RuleApplyFunctionArray[CARTESIAN_PRODUCT] = &ApplyCartesianProduct;
ruleApplyFunctionInitialized = true;
@ -2076,6 +2062,28 @@ ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
}
/*
* ApplyCartesianProductReferenceJoin creates a new MultiJoin node that joins
* the left and the right node. The new node uses the broadcast join rule to
* perform the join.
*/
static MultiNode *
ApplyCartesianProductReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses)
{
MultiJoin *joinNode = CitusMakeNode(MultiJoin);
joinNode->joinRuleType = CARTESIAN_PRODUCT_REFERENCE_JOIN;
joinNode->joinType = joinType;
joinNode->joinClauseList = applicableJoinClauses;
SetLeftChild((MultiBinaryNode *) joinNode, leftNode);
SetRightChild((MultiBinaryNode *) joinNode, rightNode);
return (MultiNode *) joinNode;
}
/*
* ApplyLocalJoin creates a new MultiJoin node that joins the left and the right
* node. The new node uses the local join rule to perform the join.

View File

@ -40,6 +40,7 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h"
@ -3456,6 +3457,28 @@ FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery,
}
/*
* NodeIsRangeTblRefReferenceTable checks if the node is a RangeTblRef that
* points to a reference table in the rangeTableList.
*/
static bool
NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList)
{
if (!IsA(node, RangeTblRef))
{
return false;
}
RangeTblRef *tableRef = castNode(RangeTblRef, node);
RangeTblEntry *rangeTableEntry = rt_fetch(tableRef->rtindex, rangeTableList);
CitusRTEKind rangeTableType = GetRangeTblKind(rangeTableEntry);
if (rangeTableType != CITUS_RTE_RELATION)
{
return false;
}
return PartitionMethod(rangeTableEntry->relid) == DISTRIBUTE_BY_NONE;
}
/*
* JoinSequenceArray walks over the join nodes in the job query and constructs a join
* sequence containing an entry for each joined table. The function then returns an
@ -3495,19 +3518,26 @@ JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *dependen
foreach(joinExprCell, joinExprList)
{
JoinExpr *joinExpr = (JoinExpr *) lfirst(joinExprCell);
RangeTblRef *rightTableRef = (RangeTblRef *) joinExpr->rarg;
RangeTblRef *rightTableRef = castNode(RangeTblRef, joinExpr->rarg);
uint32 nextRangeTableId = rightTableRef->rtindex;
ListCell *nextJoinClauseCell = NULL;
Index existingRangeTableId = 0;
bool applyJoinPruning = false;
List *nextJoinClauseList = make_ands_implicit((Expr *) joinExpr->quals);
bool leftIsReferenceTable = NodeIsRangeTblRefReferenceTable(joinExpr->larg,
rangeTableList);
bool rightIsReferenceTable = NodeIsRangeTblRefReferenceTable(joinExpr->rarg,
rangeTableList);
bool isReferenceJoin = IsSupportedReferenceJoin(joinExpr->jointype,
leftIsReferenceTable,
rightIsReferenceTable);
/*
* If next join clause list is empty, the user tried a cartesian product
* between tables. We don't support this functionality, and error out.
* between tables. We don't support this functionality for non
* reference joins, and error out.
*/
if (nextJoinClauseList == NIL)
if (nextJoinClauseList == NIL && !isReferenceJoin)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning on this query"),
@ -3518,17 +3548,23 @@ JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *dependen
* We now determine if we can apply join pruning between existing range
* tables and this new one.
*/
foreach(nextJoinClauseCell, nextJoinClauseList)
Node *nextJoinClause = NULL;
foreach_ptr(nextJoinClause, nextJoinClauseList)
{
OpExpr *nextJoinClause = (OpExpr *) lfirst(nextJoinClauseCell);
if (!IsJoinClause((Node *) nextJoinClause))
if (!NodeIsEqualsOpExpr(nextJoinClause))
{
continue;
}
Var *leftColumn = LeftColumnOrNULL(nextJoinClause);
Var *rightColumn = RightColumnOrNULL(nextJoinClause);
OpExpr *nextJoinClauseOpExpr = castNode(OpExpr, nextJoinClause);
if (!IsJoinClause((Node *) nextJoinClauseOpExpr))
{
continue;
}
Var *leftColumn = LeftColumnOrNULL(nextJoinClauseOpExpr);
Var *rightColumn = RightColumnOrNULL(nextJoinClauseOpExpr);
if (leftColumn == NULL || rightColumn == NULL)
{
continue;
@ -3567,7 +3603,7 @@ JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *dependen
if (leftPartitioned && rightPartitioned)
{
/* make sure this join clause references only simple columns */
CheckJoinBetweenColumns(nextJoinClause);
CheckJoinBetweenColumns(nextJoinClauseOpExpr);
applyJoinPruning = true;
break;

View File

@ -15,6 +15,8 @@
#ifndef MULTI_JOIN_ORDER_H
#define MULTI_JOIN_ORDER_H
#include "postgres.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
@ -33,7 +35,8 @@ typedef enum JoinRuleType
SINGLE_HASH_PARTITION_JOIN = 3,
SINGLE_RANGE_PARTITION_JOIN = 4,
DUAL_PARTITION_JOIN = 5,
CARTESIAN_PRODUCT = 6,
CARTESIAN_PRODUCT_REFERENCE_JOIN = 6,
CARTESIAN_PRODUCT = 7,
/*
* Add new join rule types above this comment. After adding, you must also
@ -83,9 +86,12 @@ extern bool EnableSingleHashRepartitioning;
extern List * JoinExprList(FromExpr *fromExpr);
extern List * JoinOrderList(List *rangeTableEntryList, List *joinClauseList);
extern bool IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId,
OpExpr *joinClause);
Node *joinClause);
extern List * ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId,
List *joinClauseList);
extern bool NodeIsEqualsOpExpr(Node *node);
extern bool IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable,
bool rightIsReferenceTable);
extern OpExpr * SinglePartitionJoinClause(Var *partitionColumn,
List *applicableJoinClauses);
extern OpExpr * DualPartitionJoinClause(List *applicableJoinClauses);

View File

@ -185,7 +185,51 @@ DEBUG: Plan 12 query after replacing subqueries and CTEs: UPDATE recursive_dml_
(1 row)
-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be
-- recursively planned
-- recursively planned, however it can be planned using the repartition planner
SET citus.enable_repartition_joins to on;
SELECT DISTINCT foo_inner_1.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
)
foo_inner_1 JOIN LATERAL
(
SELECT
second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND foo_inner_1.dept = second_distributed_table.dept
AND
second_distributed_table.dept IN (4,5)
) foo_inner_2
ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id)
ORDER BY foo_inner_1.tenant_id;
tenant_id
-----------
14
24
34
4
44
54
64
74
84
94
(10 rows)
RESET citus.enable_repartition_joins;
-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be
-- recursively planned, this one can not be planned by the repartion planner
-- because of the IN query on a non unique column
UPDATE
second_distributed_table
SET
@ -201,8 +245,7 @@ FROM
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
)
second_distributed_table.dept IN (select dept from second_distributed_table))
foo_inner_1 JOIN LATERAL
(
SELECT
@ -218,6 +261,7 @@ FROM
ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id)
) as foo
RETURNING *;
DEBUG: generating subplan 15_1 for subquery SELECT dept FROM recursive_dml_queries.second_distributed_table
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- again a corrolated subquery
-- this time distribution key eq. exists
@ -253,8 +297,8 @@ ERROR: complex joins are only supported when all distributed tables are joined
INSERT INTO
second_distributed_table (tenant_id, dept)
VALUES ('3', (WITH vals AS (SELECT 3) select * from vals));
DEBUG: generating subplan 18_1 for CTE vals: SELECT 3
DEBUG: Plan 18 query after replacing subqueries and CTEs: INSERT INTO recursive_dml_queries.second_distributed_table (tenant_id, dept) VALUES ('3'::text, (SELECT vals."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('18_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) vals))
DEBUG: generating subplan 20_1 for CTE vals: SELECT 3
DEBUG: Plan 20 query after replacing subqueries and CTEs: INSERT INTO recursive_dml_queries.second_distributed_table (tenant_id, dept) VALUES ('3'::text, (SELECT vals."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) vals))
ERROR: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
INSERT INTO
@ -277,8 +321,8 @@ UPDATE distributed_table
SET dept = 5
FROM cte_1
WHERE distributed_table.tenant_id < cte_1.tenant_id;
DEBUG: generating subplan 20_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id)
DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id)
WITH cte_1 AS (
WITH cte_2 AS (
SELECT tenant_id as cte2_id
@ -293,8 +337,8 @@ UPDATE distributed_table
SET dept = 5
FROM cte_1
WHERE distributed_table.tenant_id < cte_1.tenant_id;
DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id)
DEBUG: generating subplan 24_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
DEBUG: Plan 24 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id)
-- we don't support updating local table with a join with
-- distributed tables
UPDATE

View File

@ -41,8 +41,11 @@ ORDER BY 1,2,3;
2 | 2 | 2 | 4 | 4
(4 rows)
-- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join.
-- Unplannable query due to a three-way join which causes no valid path (besides the cartesian product) to be found
-- The join clause is wider than it used to be, causing this query to be
-- recognized by the LogicalPlanner as a repartition join.
-- Due to a three-way join this causes no valid path, besides the cartesian
-- product on reference tables. This is allowed, so it should be able to be
-- planned.
SELECT *
FROM
test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable
@ -50,7 +53,19 @@ FROM
ref b
WHERE t2.y - a.a - b.b = 0
ORDER BY 1,2,3;
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
y | x | x | a | b | a | b
---+---+---+---+---+---+---
(0 rows)
-- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join.
-- Unplannable query due to a three-way join which causes no valid path to be found
SELECT *
FROM
test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable
test a,
test b
WHERE t2.y - a.x - b.x = 0
ORDER BY 1,2,3;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
SET client_min_messages TO WARNING;
DROP SCHEMA expression_reference_join CASCADE;

View File

@ -99,9 +99,7 @@ LOG: join order: [ "lineitem" ][ local partition join "orders" ]
EXPLAIN SELECT l_quantity FROM lineitem, orders
WHERE (l_orderkey = o_orderkey OR l_quantity > 5);
LOG: join order: [ "lineitem" ][ cartesian product "orders" ]
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
EXPLAIN SELECT count(*) FROM orders, lineitem_hash
WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ single range partition join "lineitem_hash" ]

View File

@ -804,11 +804,11 @@ INSERT INTO colocated_table_test_2 VALUES (2, 2.0, '2', '2016-12-02');
\c - - - :worker_1_port
SET client_min_messages TO DEBUG1;
SET citus.log_multi_join_order TO TRUE;
SELECT
SELECT
reference_table_test.value_1
FROM
FROM
reference_table_test, colocated_table_test
WHERE
WHERE
colocated_table_test.value_1 = reference_table_test.value_1
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ]
@ -818,11 +818,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
2
(2 rows)
SELECT
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test
WHERE
FROM
reference_table_test, colocated_table_test
WHERE
colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ]
@ -832,11 +832,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
2
(2 rows)
SELECT
SELECT
colocated_table_test.value_2
FROM
FROM
colocated_table_test, reference_table_test
WHERE
WHERE
reference_table_test.value_1 = colocated_table_test.value_1
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ]
@ -846,21 +846,29 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
2
(2 rows)
SELECT
colocated_table_test.value_2
FROM
SET citus.enable_repartition_joins = on;
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
WHERE
colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ cartesian product "colocated_table_test_2" ]
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
SELECT
colocated_table_test.value_2
FROM
ORDER BY colocated_table_test.value_2;
LOG: join order: [ "colocated_table_test_2" ][ cartesian product reference join "reference_table_test" ][ dual partition join "colocated_table_test" ]
value_2
---------
1
1
2
2
(4 rows)
RESET citus.enable_repartition_joins;
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
WHERE
colocated_table_test.value_1 = colocated_table_test_2.value_1 AND colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ local partition join "colocated_table_test_2" ]
@ -871,11 +879,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
(2 rows)
SET citus.task_executor_type to "task-tracker";
SELECT
colocated_table_test.value_2
FROM
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
WHERE
colocated_table_test.value_2 = colocated_table_test_2.value_2 AND colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ dual partition join "colocated_table_test_2" ]
@ -885,11 +893,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
2
(2 rows)
SELECT
reference_table_test.value_2
FROM
SELECT
reference_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
WHERE
colocated_table_test.value_1 = reference_table_test.value_1 AND colocated_table_test_2.value_1 = reference_table_test.value_1
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ dual partition join "colocated_table_test_2" ]

View File

@ -1004,11 +1004,11 @@ INSERT INTO colocated_table_test_2 VALUES (1, 1.0, '1', '2016-12-01');
INSERT INTO colocated_table_test_2 VALUES (2, 2.0, '2', '2016-12-02');
SET client_min_messages TO DEBUG1;
SET citus.log_multi_join_order TO TRUE;
SELECT
SELECT
reference_table_test.value_1
FROM
FROM
reference_table_test, colocated_table_test
WHERE
WHERE
colocated_table_test.value_1 = reference_table_test.value_1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ]
value_1
@ -1017,11 +1017,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
2
(2 rows)
SELECT
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test
WHERE
FROM
reference_table_test, colocated_table_test
WHERE
colocated_table_test.value_2 = reference_table_test.value_2;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ]
value_2
@ -1030,11 +1030,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
2
(2 rows)
SELECT
SELECT
colocated_table_test.value_2
FROM
FROM
colocated_table_test, reference_table_test
WHERE
WHERE
reference_table_test.value_1 = colocated_table_test.value_1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ]
value_2
@ -1043,20 +1043,29 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
2
(2 rows)
SELECT
colocated_table_test.value_2
FROM
SET citus.enable_repartition_joins = on;
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
colocated_table_test.value_2 = reference_table_test.value_2;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ cartesian product "colocated_table_test_2" ]
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
SELECT
colocated_table_test.value_2
FROM
WHERE
colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY colocated_table_test.value_2;
LOG: join order: [ "colocated_table_test_2" ][ cartesian product reference join "reference_table_test" ][ dual partition join "colocated_table_test" ]
value_2
---------
1
1
2
2
(4 rows)
RESET citus.enable_repartition_joins;
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
WHERE
colocated_table_test.value_1 = colocated_table_test_2.value_1 AND colocated_table_test.value_2 = reference_table_test.value_2;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ local partition join "colocated_table_test_2" ]
value_2
@ -1066,11 +1075,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
(2 rows)
SET citus.task_executor_type to "task-tracker";
SELECT
colocated_table_test.value_2
FROM
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
WHERE
colocated_table_test.value_2 = colocated_table_test_2.value_2 AND colocated_table_test.value_2 = reference_table_test.value_2;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ dual partition join "colocated_table_test_2" ]
value_2
@ -1079,11 +1088,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
2
(2 rows)
SELECT
reference_table_test.value_2
FROM
SELECT
reference_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
WHERE
colocated_table_test.value_1 = reference_table_test.value_1 AND colocated_table_test_2.value_1 = reference_table_test.value_1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ dual partition join "colocated_table_test_2" ]
value_2
@ -1096,7 +1105,7 @@ SET citus.log_multi_join_order TO FALSE;
SET citus.shard_count TO DEFAULT;
SET citus.task_executor_type to "adaptive";
-- some INSERT .. SELECT queries that involve both hash distributed and reference tables
-- should go via coordinator since we're inserting into reference table where
-- should go via coordinator since we're inserting into reference table where
-- not all the participants are reference tables
INSERT INTO
reference_table_test (value_1)
@ -1122,7 +1131,7 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
-- safe to push down even lack of equality between partition column and column of reference table
INSERT INTO
colocated_table_test (value_1, value_2)
SELECT
SELECT
colocated_table_test_2.value_1, reference_table_test.value_2
FROM
colocated_table_test_2, reference_table_test
@ -1135,10 +1144,10 @@ RETURNING value_1, value_2;
2 | 2
(2 rows)
-- similar query with the above, this time partition key but without equality
-- similar query with the above, this time partition key but without equality
INSERT INTO
colocated_table_test (value_1, value_2)
SELECT
SELECT
colocated_table_test_2.value_1, reference_table_test.value_2
FROM
colocated_table_test_2, reference_table_test
@ -1251,7 +1260,7 @@ WHERE
2 | 2
(2 rows)
-- let's now test TRUNCATE and DROP TABLE
-- let's now test TRUNCATE and DROP TABLE
-- delete all rows and ingest some data
DELETE FROM reference_table_test;
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
@ -1442,7 +1451,7 @@ CREATE OR REPLACE FUNCTION select_count_all() RETURNS bigint AS '
FROM
reference_table_test;
' LANGUAGE SQL;
CREATE OR REPLACE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp)
CREATE OR REPLACE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp)
RETURNS void AS '
INSERT INTO reference_table_test VALUES ($1, $2, $3, $4);
' LANGUAGE SQL;
@ -1497,7 +1506,7 @@ SELECT select_count_all();
TRUNCATE reference_table_test;
-- some prepared queries and pl/pgsql functions
PREPARE insert_into_ref_table_pr (int, float, text, timestamp)
PREPARE insert_into_ref_table_pr (int, float, text, timestamp)
AS INSERT INTO reference_table_test VALUES ($1, $2, $3, $4);
-- reference tables do not have up-to-five execution limit as other tables
EXECUTE insert_into_ref_table_pr(1, 1.0, '1', '2016-12-01');
@ -1587,7 +1596,7 @@ ROLLBACK;
-- clean up tables, ...
SET client_min_messages TO ERROR;
DROP SEQUENCE example_ref_value_seq;
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite,
colocated_table_test, colocated_table_test_2, append_reference_tmp_table;
DROP TYPE reference_comp_key;

View File

@ -0,0 +1,226 @@
SET citus.log_multi_join_order to TRUE;
SET client_min_messages to DEBUG1;
SET citus.enable_repartition_joins to on;
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND l_suppkey < s_suppkey
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
-----------+-----------+-------
195 | 196 | 804
245 | 246 | 754
278 | 279 | 721
308 | 309 | 691
309 | 310 | 1380
350 | 351 | 649
358 | 359 | 641
574 | 575 | 425
641 | 642 | 358
654 | 655 | 345
(10 rows)
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND int4eq(l_suppkey, s_suppkey)
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
-----------+-----------+-------
195 | 196 | 1
245 | 246 | 1
278 | 279 | 1
308 | 309 | 1
309 | 310 | 2
350 | 351 | 1
358 | 359 | 1
574 | 575 | 1
641 | 642 | 1
654 | 655 | 1
(10 rows)
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND NOT int4ne(l_suppkey, s_suppkey)
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
-----------+-----------+-------
195 | 196 | 1
245 | 246 | 1
278 | 279 | 1
308 | 309 | 1
309 | 310 | 2
350 | 351 | 1
358 | 359 | 1
574 | 575 | 1
641 | 642 | 1
654 | 655 | 1
(10 rows)
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ single range partition join "part_append" ][ cartesian product reference join "supplier" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
-----------+-----------+-------
18 | 7519 | 1000
79 | 7580 | 1000
91 | 2592 | 1000
149 | 5150 | 1000
149 | 7650 | 1000
175 | 5176 | 1000
179 | 2680 | 1000
182 | 7683 | 1000
195 | 196 | 1000
204 | 7705 | 1000
(10 rows)
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND (int4eq(l_suppkey, s_suppkey) OR l_suppkey = s_suppkey)
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
-----------+-----------+-------
195 | 196 | 1
245 | 246 | 1
278 | 279 | 1
308 | 309 | 1
309 | 310 | 2
350 | 351 | 1
358 | 359 | 1
574 | 575 | 1
641 | 642 | 1
654 | 655 | 1
(10 rows)
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND (int4eq(l_suppkey, s_suppkey) OR random() > 2)
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
-----------+-----------+-------
195 | 196 | 1
245 | 246 | 1
278 | 279 | 1
308 | 309 | 1
309 | 310 | 2
350 | 351 | 1
358 | 359 | 1
574 | 575 | 1
641 | 642 | 1
654 | 655 | 1
(10 rows)
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND (l_suppkey = 1 OR s_suppkey = 1)
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
-----------+-----------+-------
18 | 7519 | 1
79 | 7580 | 1
91 | 2592 | 1
149 | 5150 | 1
149 | 7650 | 1
175 | 5176 | 1
179 | 2680 | 1
182 | 7683 | 1
195 | 196 | 1
204 | 7705 | 1
(10 rows)
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND l_partkey + p_partkey = s_suppkey
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ single range partition join "part_append" ][ reference join "supplier" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
-----------+-----------+-------
18 | 7519 | 1
79 | 7580 | 1
91 | 2592 | 1
149 | 5150 | 1
149 | 7650 | 1
175 | 5176 | 1
179 | 2680 | 1
182 | 7683 | 1
195 | 196 | 1
204 | 7705 | 1
(10 rows)

View File

@ -112,7 +112,7 @@ test: multi_join_order_tpch_repartition
# new shards before these tests, as they expect specific shard identifiers in
# the output.
# ----------
test: multi_repartition_join_planning multi_repartition_join_pruning multi_repartition_join_task_assignment
test: multi_repartition_join_planning multi_repartition_join_pruning multi_repartition_join_task_assignment multi_repartition_join_ref
test: adaptive_executor_repartition
# ---------

View File

@ -143,7 +143,39 @@ RETURNING
distributed_table.*;
-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be
-- recursively planned
-- recursively planned, however it can be planned using the repartition planner
SET citus.enable_repartition_joins to on;
SELECT DISTINCT foo_inner_1.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
)
foo_inner_1 JOIN LATERAL
(
SELECT
second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND foo_inner_1.dept = second_distributed_table.dept
AND
second_distributed_table.dept IN (4,5)
) foo_inner_2
ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id)
ORDER BY foo_inner_1.tenant_id;
RESET citus.enable_repartition_joins;
-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be
-- recursively planned, this one can not be planned by the repartion planner
-- because of the IN query on a non unique column
UPDATE
second_distributed_table
SET
@ -159,8 +191,7 @@ FROM
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
)
second_distributed_table.dept IN (select dept from second_distributed_table))
foo_inner_1 JOIN LATERAL
(
SELECT

View File

@ -27,8 +27,11 @@ FROM
WHERE t2.y * 2 = a.a
ORDER BY 1,2,3;
-- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join.
-- Unplannable query due to a three-way join which causes no valid path (besides the cartesian product) to be found
-- The join clause is wider than it used to be, causing this query to be
-- recognized by the LogicalPlanner as a repartition join.
-- Due to a three-way join this causes no valid path, besides the cartesian
-- product on reference tables. This is allowed, so it should be able to be
-- planned.
SELECT *
FROM
test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable
@ -37,5 +40,17 @@ FROM
WHERE t2.y - a.a - b.b = 0
ORDER BY 1,2,3;
-- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join.
-- Unplannable query due to a three-way join which causes no valid path to be found
SELECT *
FROM
test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable
test a,
test b
WHERE t2.y - a.x - b.x = 0
ORDER BY 1,2,3;
SET client_min_messages TO WARNING;
DROP SCHEMA expression_reference_join CASCADE;

View File

@ -518,13 +518,15 @@ WHERE
ORDER BY 1;
SET citus.enable_repartition_joins = on;
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY 1;
ORDER BY colocated_table_test.value_2;
RESET citus.enable_repartition_joins;
SELECT
colocated_table_test.value_2

View File

@ -656,12 +656,15 @@ FROM
WHERE
reference_table_test.value_1 = colocated_table_test.value_1;
SET citus.enable_repartition_joins = on;
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
colocated_table_test.value_2 = reference_table_test.value_2;
colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY colocated_table_test.value_2;
RESET citus.enable_repartition_joins;
SELECT
colocated_table_test.value_2

View File

@ -0,0 +1,105 @@
SET citus.log_multi_join_order to TRUE;
SET client_min_messages to DEBUG1;
SET citus.enable_repartition_joins to on;
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND l_suppkey < s_suppkey
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND int4eq(l_suppkey, s_suppkey)
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND NOT int4ne(l_suppkey, s_suppkey)
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND (int4eq(l_suppkey, s_suppkey) OR l_suppkey = s_suppkey)
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND (int4eq(l_suppkey, s_suppkey) OR random() > 2)
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND (l_suppkey = 1 OR s_suppkey = 1)
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
SELECT
l_partkey, l_suppkey, count(*)
FROM
lineitem, part_append, supplier
WHERE
l_partkey = p_partkey
AND l_partkey + p_partkey = s_suppkey
GROUP BY
l_partkey, l_suppkey
ORDER BY
l_partkey, l_suppkey
LIMIT 10;