directly pass join and where clause when we have outer joins in query

outer-join-noncolocated-dist-tables
aykutbozkurt 2023-01-05 21:47:26 +03:00
parent 3c4f478f7b
commit e06bed1a7f
23 changed files with 500 additions and 325 deletions

View File

@ -54,8 +54,6 @@ static RuleEvalFunction RuleEvalFunctionArray[JOIN_RULE_LAST] = { 0 }; /* join r
/* Local functions forward declarations */
static bool JoinExprListWalker(Node *node, List **joinList);
static bool ExtractLeftMostRangeTableIndex(Node *node, int *rangeTableIndex);
static bool ExtractRightMostRangeTableIndex(Node *node, int *rangeTableIndex);
static List * JoinOrderForTable(TableEntry *firstTable, List *tableEntryList,
List *joinClauseList);
static List * BestJoinOrder(List *candidateJoinOrders);
@ -65,16 +63,19 @@ static List * LatestLargeDataTransfer(List *candidateJoinOrders);
static void PrintJoinOrderList(List *joinOrder);
static uint32 LargeDataTransferLocation(List *joinOrder);
static List * TableEntryListDifference(List *lhsTableList, List *rhsTableList);
static bool JoinTypeJoinExprWalker(Node *node, JoinTypeContext *joinTypeContext);
static JoinType * FindJoinTypeBetweenTables(List *joinExprList, List *leftTableIdxList,
uint32 rtableIdx);
static bool ConvertSemiToInnerInJoinInfoContext(JoinInfoContext *joinOrderContext);
static bool JoinInfoContextHasAntiJoin(JoinInfoContext *joinOrderContext);
static const char * JoinTypeName(JoinType jointype);
/* Local functions forward declarations for join evaluations */
static JoinOrderNode * EvaluateJoinRules(List *joinedTableList,
JoinOrderNode *currentJoinNode,
TableEntry *candidateTable,
List *joinClauseList, JoinType joinType);
List *joinClauseList,
JoinType joinType,
bool passJoinClauseDirectly);
static List * RangeTableIdList(List *tableList);
static TableEntry * TableEntryByRangeTableId(List *tableEntryList, uint32 rangeTableIdx);
static RuleEvalFunction JoinRuleEvalFunction(JoinRuleType ruleType);
static char * JoinRuleName(JoinRuleType ruleType);
static JoinOrderNode * ReferenceJoin(JoinOrderNode *joinNode, TableEntry *candidateTable,
@ -183,7 +184,7 @@ JoinExprListWalker(Node *node, List **joinList)
* ExtractLeftMostRangeTableIndex extracts the range table index of the left-most
* leaf in a join tree.
*/
static bool
bool
ExtractLeftMostRangeTableIndex(Node *node, int *rangeTableIndex)
{
bool walkerResult = false;
@ -213,40 +214,6 @@ ExtractLeftMostRangeTableIndex(Node *node, int *rangeTableIndex)
}
/*
* ExtractRightMostRangeTableIndex extracts the range table index of the right-most
* leaf in a join tree.
*/
static bool
ExtractRightMostRangeTableIndex(Node *node, int *rangeTableIndex)
{
bool walkerResult = false;
Assert(node != NULL);
if (IsA(node, JoinExpr))
{
JoinExpr *joinExpr = (JoinExpr *) node;
walkerResult = ExtractRightMostRangeTableIndex(joinExpr->rarg, rangeTableIndex);
}
else if (IsA(node, RangeTblRef))
{
RangeTblRef *rangeTableRef = (RangeTblRef *) node;
*rangeTableIndex = rangeTableRef->rtindex;
walkerResult = true;
}
else
{
walkerResult = expression_tree_walker(node, ExtractRightMostRangeTableIndex,
rangeTableIndex);
}
return walkerResult;
}
/*
* JoinOnColumns determines whether two columns are joined by a given join clause list.
*/
@ -363,84 +330,22 @@ JoinOrderList(List *tableEntryList, List *joinClauseList)
}
static JoinType *
FindJoinTypeBetweenTables(List *joinExprList, List *leftTableIdxList, uint32 rtableIdx)
{
uint32 ltableIdx;
foreach_int(ltableIdx, leftTableIdxList)
{
JoinTypeContext joinTypeContext = {
.ltableIdx = ltableIdx,
.rtableIdx = rtableIdx,
.joinType = NULL,
};
JoinExpr *joinExpr = NULL;
foreach_ptr(joinExpr, joinExprList)
{
JoinTypeJoinExprWalker((Node *) joinExpr, &joinTypeContext);
if (joinTypeContext.joinType)
{
return joinTypeContext.joinType;
}
}
}
return NULL;
}
/*
* JoinTypeJoinExprWalker finds join type between range table indexes.
* Only handles left recursive join trees.
* TableEntryByRangeTableId returns TableEntry from given list with specified range table id.
*/
static bool
JoinTypeJoinExprWalker(Node *node, JoinTypeContext *joinTypeContext)
static TableEntry *
TableEntryByRangeTableId(List *tableEntryList, uint32 rangeTableIdx)
{
if (node == NULL)
TableEntry *tableEntry = NULL;
foreach_ptr(tableEntry, tableEntryList)
{
return false;
}
if (IsA(node, JoinExpr))
if (tableEntry->rangeTableId == rangeTableIdx)
{
JoinExpr *joinExpr = (JoinExpr *) node;
if (IsA(joinExpr->rarg, RangeTblRef) &&
((RangeTblRef *) joinExpr->rarg)->rtindex == joinTypeContext->rtableIdx)
{
/*
* we found right table entry, then we need to find rightmost entry of left arg
* of the join tree
*/
int ltableIdx = 0;
ExtractRightMostRangeTableIndex(joinExpr->larg, &ltableIdx);
if (joinTypeContext->ltableIdx == ltableIdx)
{
/*
* if we have semi join here, we can safely convert them to inner joins. We already
* checked planner actually planned those nodes as inner joins
*/
if (joinExpr->jointype == JOIN_SEMI)
{
joinExpr->jointype = JOIN_INNER;
}
else if (joinExpr->jointype == JOIN_ANTI)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"complex joins are only supported when all distributed tables are "
"co-located and joined on their distribution columns")));
}
joinTypeContext->joinType = &(joinExpr->jointype);
return true;
}
return tableEntry;
}
}
return expression_tree_walker(node, JoinTypeJoinExprWalker, joinTypeContext);
ereport(ERROR, errmsg("Unexpected table entry!"));
}
@ -449,64 +354,63 @@ JoinTypeJoinExprWalker(Node *node, JoinTypeContext *joinTypeContext)
* applicable join rules for the nodes in the list.
*/
List *
FixedJoinOrderList(List *tableEntryList, List *joinClauseList,
List *joinExprList)
FixedJoinOrderList(List *tableEntryList, JoinInfoContext *joinInfoContext)
{
List *joinOrderList = NIL;
List *joinedTableList = NIL;
bool firstTable = true;
JoinOrderNode *currentJoinNode = NULL;
JoinOrderNode *nextJoinNode = NULL;
int tableCount = list_length(tableEntryList);
int tableIdx;
for (tableIdx = 1; tableIdx < tableCount; tableIdx++)
/* we donot support anti joins as ruleutils files cannot deparse JOIN_ANTI */
if (JoinInfoContextHasAntiJoin(joinInfoContext))
{
TableEntry *currentTable = (TableEntry *) list_nth(tableEntryList, tableIdx - 1);
TableEntry *nextTable = (TableEntry *) list_nth(tableEntryList, tableIdx);
if (firstTable)
{
/* add first table into joinedtable list */
joinedTableList = lappend(joinedTableList, currentTable);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"complex joins are only supported when all distributed "
"tables are joined on their distribution columns with "
"equal operator")));
}
/*
* if we cannot find join type between tables, then it is cartesian product. We can use JOIN_INNER,
* which will be executed as cartesian product.
* converts semi joins in given join info context to inner joins as we checked
* at query_pushdown_planning that planner did not actually plan any semi join.
* ruleutils files cannot deparse JOIN_SEMI, so we convert those to JOIN_INNER.
*/
JoinType joinType = JOIN_INNER;
List *joinedTableIdxList = RangeTableIdList(joinedTableList);
JoinType *applicableJoinType = FindJoinTypeBetweenTables(joinExprList,
joinedTableIdxList,
nextTable->rangeTableId);
if (applicableJoinType)
{
joinType = *applicableJoinType;
}
ConvertSemiToInnerInJoinInfoContext(joinInfoContext);
List *joinOrderList = NIL;
List *joinedTableList = NIL;
JoinOrderNode *nextJoinNode = NULL;
/* fetch joininfo */
JoinInfo *firstJoinInfo = (JoinInfo *) list_nth(joinInfoContext->joinInfoList, 0);
/* add first table into joinedtable list */
TableEntry *firstTable = TableEntryByRangeTableId(tableEntryList,
firstJoinInfo->ltableIdx);
joinedTableList = lappend(joinedTableList, firstTable);
if (firstTable)
{
/* create join node for the first table */
JoinRuleType joinRule = JOIN_RULE_INVALID_FIRST;
Oid relationId = currentTable->relationId;
uint32 tableId = currentTable->rangeTableId;
Oid relationId = firstTable->relationId;
uint32 tableId = firstTable->rangeTableId;
Var *partitionColumn = PartitionColumn(relationId, tableId);
char partitionMethod = PartitionMethod(relationId);
currentJoinNode = MakeJoinOrderNode(currentTable, joinRule,
JoinOrderNode *currentJoinNode = MakeJoinOrderNode(firstTable, joinRule,
list_make1(partitionColumn),
partitionMethod,
currentTable, joinType);
firstTable,
firstJoinInfo->joinType);
joinOrderList = lappend(joinOrderList, currentJoinNode);
firstTable = false;
}
JoinInfo *joinInfo = NULL;
foreach_ptr(joinInfo, joinInfoContext->joinInfoList)
{
TableEntry *nextTable = TableEntryByRangeTableId(tableEntryList,
joinInfo->rtableIdx);
bool passJoinClauseDirectly = true;
nextJoinNode = EvaluateJoinRules(joinedTableList,
currentJoinNode,
nextTable,
joinClauseList, joinType);
joinInfo->joinQualifierList,
joinInfo->joinType,
passJoinClauseDirectly);
if (nextJoinNode == NULL)
{
@ -535,6 +439,48 @@ FixedJoinOrderList(List *tableEntryList, List *joinClauseList,
}
/*
* JoinInfoContextHasAntiJoin returns true if given join info context contains
* an anti join.
*/
static bool
JoinInfoContextHasAntiJoin(JoinInfoContext *joinOrderContext)
{
JoinInfo *joinInfo = NULL;
foreach_ptr(joinInfo, joinOrderContext->joinInfoList)
{
if (joinInfo->joinType == JOIN_ANTI)
{
return true;
}
}
return false;
}
/*
* ConvertSemiToInnerInJoinInfoContext converts semi joins in given join info context
* to inner joins as we checked at query_pushdown_planning that planner did not actually
* plan any semi join. ruleutils files cannot deparse JOIN_SEMI, so we convert those
* to JOIN_INNER.
*/
static bool
ConvertSemiToInnerInJoinInfoContext(JoinInfoContext *joinOrderContext)
{
JoinInfo *joinInfo = NULL;
foreach_ptr(joinInfo, joinOrderContext->joinInfoList)
{
if (joinInfo->joinType == JOIN_SEMI)
{
joinInfo->joinType = JOIN_INNER;
}
}
return false;
}
/*
* JoinOrderForTable creates a join order whose first element is the given first
* table. To determine each subsequent element in the join order, the function
@ -587,10 +533,13 @@ JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, List *joinClause
JoinType joinType = JOIN_INNER;
/* evaluate all join rules for this pending table */
bool passJoinClauseDirectly = false;
JoinOrderNode *pendingJoinNode = EvaluateJoinRules(joinedTableList,
currentJoinNode,
pendingTable,
joinClauseList, joinType);
joinClauseList,
joinType,
passJoinClauseDirectly);
if (pendingJoinNode == NULL)
{
@ -833,16 +782,6 @@ JoinTypeName(JoinType jointype)
return "FULL";
}
case JOIN_SEMI:
{
return "SEMI";
}
case JOIN_ANTI:
{
return "ANTI";
}
default:
/* Shouldn't come here, but protect from buggy code. */
@ -937,7 +876,7 @@ TableEntryListDifference(List *lhsTableList, List *rhsTableList)
static JoinOrderNode *
EvaluateJoinRules(List *joinedTableList, JoinOrderNode *currentJoinNode,
TableEntry *candidateTable, List *joinClauseList,
JoinType joinType)
JoinType joinType, bool passJoinClauseDirectly)
{
JoinOrderNode *nextJoinNode = NULL;
uint32 lowestValidIndex = JOIN_RULE_INVALID_FIRST + 1;
@ -961,6 +900,7 @@ EvaluateJoinRules(List *joinedTableList, JoinOrderNode *currentJoinNode,
nextJoinNode = (*ruleEvalFunction)(currentJoinNode,
candidateTable,
(passJoinClauseDirectly) ? joinClauseList :
applicableJoinClauses,
joinType);
@ -978,7 +918,8 @@ EvaluateJoinRules(List *joinedTableList, JoinOrderNode *currentJoinNode,
Assert(nextJoinNode != NULL);
nextJoinNode->joinType = joinType;
nextJoinNode->joinClauseList = applicableJoinClauses;
nextJoinNode->joinClauseList = (passJoinClauseDirectly) ? joinClauseList :
applicableJoinClauses;
return nextJoinNode;
}

View File

@ -85,15 +85,20 @@ static bool ExtractFromExpressionWalker(Node *node,
QualifierWalkerContext *walkerContext);
static List * MultiTableNodeList(List *tableEntryList, List *rangeTableList);
static List * AddMultiCollectNodes(List *tableNodeList);
static MultiNode * MultiJoinTree(List *joinOrderList, List *collectTableList);
static MultiNode * MultiJoinTree(List *joinOrderList, List *collectTableList, bool
passJoinClauseDirectly);
static MultiCollect * CollectNodeForTable(List *collectTableList, uint32 rangeTableId);
static MultiSelect * MultiSelectNode(List *whereClauseList);
static MultiSelect * MultiSelectNode(List *whereClauseList, bool passWhereClauseDirectly);
static bool IsSelectClause(Node *clause);
static JoinInfoContext * FetchJoinOrderContext(FromExpr *fromExpr);
static bool JoinInfoWalker(Node *node, JoinInfoContext *joinInfoContext);
/* Local functions forward declarations for applying joins */
static MultiNode * ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode,
JoinRuleType ruleType, List *partitionColumnList,
JoinType joinType, List *joinClauseList);
JoinType joinType, List *joinClauseList,
bool passJoinClauseDirectly);
static RuleApplyFunction JoinRuleApplyFunction(JoinRuleType ruleType);
static MultiNode * ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
List *partitionColumnList, JoinType joinType,
@ -572,6 +577,7 @@ MultiNodeTree(Query *queryTree)
List *collectTableList = NIL;
MultiNode *joinTreeNode = NULL;
MultiNode *currentTopNode = NULL;
bool passQualClauseDirectly = false;
/* verify we can perform distributed planning on this query */
DeferredErrorMessage *unsupportedQueryError = DeferErrorIfQueryNotSupported(
@ -661,14 +667,16 @@ MultiNodeTree(Query *queryTree)
if (FindNodeMatchingCheckFunction((Node *) queryTree->jointree, IsOuterJoinExpr))
{
/* consider outer join qualifications as well */
List *allRestrictionClauseList = QualifierList(queryTree->jointree);
joinClauseList = JoinClauseList(allRestrictionClauseList);
List *joinExprList = JoinExprList(queryTree->jointree);
/* pass join clauses directly into fix join order */
JoinInfoContext *joinInfoContext = FetchJoinOrderContext(queryTree->jointree);
/* where clause should not contain join clause */
whereClauseList = joinInfoContext->baseQualifierList;
/* we simply donot commute joins as we have at least 1 outer join */
joinOrderList = FixedJoinOrderList(tableEntryList, joinClauseList,
joinExprList);
joinOrderList = FixedJoinOrderList(tableEntryList, joinInfoContext);
passQualClauseDirectly = true;
}
else
{
@ -680,7 +688,8 @@ MultiNodeTree(Query *queryTree)
}
/* build join tree using the join order and collected tables */
joinTreeNode = MultiJoinTree(joinOrderList, collectTableList);
joinTreeNode = MultiJoinTree(joinOrderList, collectTableList,
passQualClauseDirectly);
currentTopNode = joinTreeNode;
}
@ -688,7 +697,7 @@ MultiNodeTree(Query *queryTree)
Assert(currentTopNode != NULL);
/* build select node if the query has selection criteria */
MultiSelect *selectNode = MultiSelectNode(whereClauseList);
MultiSelect *selectNode = MultiSelectNode(whereClauseList, passQualClauseDirectly);
if (selectNode != NULL)
{
SetChild((MultiUnaryNode *) selectNode, currentTopNode);
@ -714,6 +723,127 @@ MultiNodeTree(Query *queryTree)
}
/*
* FetchJoinOrderContext returns all join info for given node.
*/
static JoinInfoContext *
FetchJoinOrderContext(FromExpr *fromExpr)
{
/* we do not allow cartesian product for outer joins */
Assert(fromExpr->fromlist && list_length(fromExpr->fromlist) == 1);
JoinInfoContext *joinInfoContext = palloc0(sizeof(JoinInfoContext));
JoinInfoWalker((Node *) fromExpr, joinInfoContext);
/* only leftmost table will have valid(ltableIdx != 0) ltableIdx */
int leftMostTableIdx = 0;
ExtractLeftMostRangeTableIndex((Node *) fromExpr, &leftMostTableIdx);
Assert(list_length(joinInfoContext->joinInfoList) > 0);
JoinInfo *leftMostJoinInfo = list_nth(joinInfoContext->joinInfoList, 0);
leftMostJoinInfo->ltableIdx = leftMostTableIdx;
return joinInfoContext;
}
/*
* JoinInfoWalker descends into given node and pushes all join info into
* joinInfoContext.
*/
static bool
JoinInfoWalker(Node *node, JoinInfoContext *joinInfoContext)
{
if (node == NULL)
{
return false;
}
/* process the deepest node first */
bool walkerResult = expression_tree_walker(node, JoinInfoWalker,
(void *) joinInfoContext);
/*
* Get qualifier lists of join and from expression nodes. Note that in the
* case of subqueries, PostgreSQL can skip simplifying, flattening and
* making ANDs implicit. If qualifiers node is not a list, then we run these
* preprocess routines on qualifiers node.
*/
if (IsA(node, JoinExpr))
{
JoinExpr *joinExpression = (JoinExpr *) node;
if (!(IsA(joinExpression->rarg, RangeTblRef) || IsA(joinExpression->rarg,
FromExpr)))
{
ereport(WARNING, (errmsg("unexpected node in joininfowalker")));
}
Node *joinQualifiersNode = joinExpression->quals;
JoinType joinType = joinExpression->jointype;
RangeTblRef *rightTableRef = NULL;
if (IsA(joinExpression->rarg, RangeTblRef))
{
rightTableRef = (RangeTblRef *) joinExpression->rarg;
}
else
{
Assert(IsA(joinExpression->rarg, FromExpr));
FromExpr *fromExpr = (FromExpr *) joinExpression->rarg;
Assert(list_length(fromExpr->fromlist) == 1);
rightTableRef = (RangeTblRef *) list_nth(fromExpr->fromlist, 0);
}
List *joinQualifierList = NIL;
if (joinQualifiersNode != NULL)
{
if (IsA(joinQualifiersNode, List))
{
joinQualifierList = (List *) joinQualifiersNode;
}
else
{
/* this part of code only run for subqueries */
Node *joinClause = eval_const_expressions(NULL, joinQualifiersNode);
joinClause = (Node *) canonicalize_qual((Expr *) joinClause, false);
joinQualifierList = make_ands_implicit((Expr *) joinClause);
}
}
JoinInfo *joinInfo = palloc0(sizeof(JoinInfo));
joinInfo->joinType = joinType;
joinInfo->rtableIdx = rightTableRef->rtindex;
joinInfo->joinQualifierList = joinQualifierList;
joinInfoContext->joinInfoList = lappend(joinInfoContext->joinInfoList, joinInfo);
}
else if (IsA(node, FromExpr))
{
List *fromQualifierList = NIL;
FromExpr *fromExpression = (FromExpr *) node;
Node *fromQualifiersNode = fromExpression->quals;
if (fromQualifiersNode != NULL)
{
if (IsA(fromQualifiersNode, List))
{
fromQualifierList = (List *) fromQualifiersNode;
}
else
{
/* this part of code only run for subqueries */
Node *fromClause = eval_const_expressions(NULL, fromQualifiersNode);
fromClause = (Node *) canonicalize_qual((Expr *) fromClause, false);
fromQualifierList = make_ands_implicit((Expr *) fromClause);
}
joinInfoContext->baseQualifierList =
list_concat(joinInfoContext->baseQualifierList, fromQualifierList);
}
}
return walkerResult;
}
/*
* ContainsReadIntermediateResultFunction determines whether an expresion tree contains
* a call to the read_intermediate_result function.
@ -1568,7 +1698,7 @@ AddMultiCollectNodes(List *tableNodeList)
* this tree after every table in the list has been joined.
*/
static MultiNode *
MultiJoinTree(List *joinOrderList, List *collectTableList)
MultiJoinTree(List *joinOrderList, List *collectTableList, bool passJoinClauseDirectly)
{
MultiNode *currentTopNode = NULL;
ListCell *joinOrderCell = NULL;
@ -1600,7 +1730,8 @@ MultiJoinTree(List *joinOrderList, List *collectTableList)
(MultiNode *) collectNode,
joinRuleType, partitionColumnList,
joinType,
joinClauseList);
joinClauseList,
passJoinClauseDirectly);
/* the new join node becomes the top of our join tree */
currentTopNode = newJoinNode;
@ -1649,7 +1780,7 @@ CollectNodeForTable(List *collectTableList, uint32 rangeTableId)
* not have any select clauses, the function return null.
*/
static MultiSelect *
MultiSelectNode(List *whereClauseList)
MultiSelectNode(List *whereClauseList, bool passWhereClauseDirectly)
{
List *selectClauseList = NIL;
MultiSelect *selectNode = NULL;
@ -1658,7 +1789,7 @@ MultiSelectNode(List *whereClauseList)
foreach(whereClauseCell, whereClauseList)
{
Node *whereClause = (Node *) lfirst(whereClauseCell);
if (IsSelectClause(whereClause))
if (passWhereClauseDirectly || IsSelectClause(whereClause))
{
selectClauseList = lappend(selectClauseList, whereClause);
}
@ -1949,7 +2080,8 @@ pull_var_clause_default(Node *node)
*/
static MultiNode *
ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, JoinRuleType ruleType,
List *partitionColumnList, JoinType joinType, List *joinClauseList)
List *partitionColumnList, JoinType joinType, List *joinClauseList,
bool passJoinClauseDirectly)
{
List *leftTableIdList = OutputTableIdList(leftNode);
List *rightTableIdList = OutputTableIdList(rightNode);
@ -1966,7 +2098,8 @@ ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, JoinRuleType ruleType,
/* call the join rule application function to create the new join node */
RuleApplyFunction ruleApplyFunction = JoinRuleApplyFunction(ruleType);
MultiNode *multiNode = (*ruleApplyFunction)(leftNode, rightNode, partitionColumnList,
joinType, applicableJoinClauses);
joinType, (passJoinClauseDirectly) ?
joinClauseList : applicableJoinClauses);
if (joinType != JOIN_INNER && CitusIsA(multiNode, MultiJoin))
{

View File

@ -113,6 +113,10 @@ static bool ContainsReferencesToRelidsWalker(Node *node,
RelidsReferenceWalkerContext *context);
static bool HasRightRecursiveJoin(FromExpr *fromExpr);
static bool RightRecursiveJoinExprWalker(Node *node, void *context);
static bool HasCartesianJoin(FromExpr *fromExpr);
static bool CartesianJoinExprWalker(Node *node, void *context);
static bool HasLateralJoin(JoinRestrictionContext *joinRestrictionContext);
/*
* ShouldUseSubqueryPushDown determines whether it's desirable to use
@ -153,7 +157,6 @@ ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
return true;
}
/*
* We check if postgres planned any semi joins, MultiNodeTree doesn't
* support these so we fail. Postgres is able to replace some IN/ANY
@ -183,16 +186,6 @@ ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
return true;
}
/*
* Some unsupported join clauses in logical planner
* may be supported by subquery pushdown planner.
*/
List *qualifierList = QualifierList(rewrittenQuery->jointree);
if (DeferErrorIfUnsupportedClause(qualifierList) != NULL)
{
return true;
}
/*
* some unsupported outer joins in logical planner
* may be supported by pushdown planner.
@ -207,12 +200,39 @@ ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
/*
* join order planner only handles left recursive join trees (except inner joins,
* which are commutative)
* which are commutative).
*/
if (HasRightRecursiveJoin(rewrittenQuery->jointree))
{
return true;
}
/*
* join order planner cannot handle cartesian joins when query tree contains outer
* join.
*/
if (HasCartesianJoin(rewrittenQuery->jointree))
{
return true;
}
/*
* join order planner cannot handle lateral join trees for outer joins.
*/
if (HasLateralJoin(plannerRestrictionContext->joinRestrictionContext))
{
return true;
}
}
/*
* Some unsupported join clauses in logical planner
* may be supported by subquery pushdown planner.
*/
List *qualifierList = QualifierList(rewrittenQuery->jointree);
if (DeferErrorIfUnsupportedClause(qualifierList) != NULL)
{
return true;
}
/* check if the query has a window function and it is safe to pushdown */
@ -227,9 +247,8 @@ ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
/*
* HasRightRecursiveJoin returns true if join tree contains any right recursive join.
* That method should be removed when we support right recursive outer joins at join
* order planner.
* HasRightRecursiveJoin returns true if it finds right recursive part
* in given join tree.
*/
static bool
HasRightRecursiveJoin(FromExpr *fromExpr)
@ -248,8 +267,7 @@ HasRightRecursiveJoin(FromExpr *fromExpr)
/*
* RightRecursiveJoinExprWalker returns true if it finds right recursive join
* in given join tree.
* RightRecursiveJoinExprWalker is helper method for HasRightRecursiveJoin.
*/
static bool
RightRecursiveJoinExprWalker(Node *node, void *context)
@ -263,7 +281,7 @@ RightRecursiveJoinExprWalker(Node *node, void *context)
{
JoinExpr *joinExpr = (JoinExpr *) node;
if (joinExpr->rarg && IsA(joinExpr->rarg, JoinExpr))
if (IsA(joinExpr->rarg, JoinExpr))
{
return true;
}
@ -273,6 +291,74 @@ RightRecursiveJoinExprWalker(Node *node, void *context)
}
/*
* HasCartesianJoin returns true if join tree contains any cartesian join.
*/
static bool
HasCartesianJoin(FromExpr *fromExpr)
{
if (fromExpr && list_length(fromExpr->fromlist) > 1)
{
return true;
}
JoinExpr *joinExpr = NULL;
foreach_ptr(joinExpr, fromExpr->fromlist)
{
if (CartesianJoinExprWalker((Node *) joinExpr, NULL))
{
return true;
}
}
return false;
}
/*
* CartesianJoinExprWalker is helper method for HasCartesianJoin.
*/
static bool
CartesianJoinExprWalker(Node *node, void *context)
{
if (node == NULL)
{
return false;
}
if (IsA(node, FromExpr))
{
FromExpr *fromExpr = (FromExpr *) node;
if (list_length(fromExpr->fromlist) > 1)
{
return true;
}
}
return expression_tree_walker(node, CartesianJoinExprWalker, NULL);
}
/*
* HasLateralJoin returns true if join restriction context contain lateral join.
*/
static bool
HasLateralJoin(JoinRestrictionContext *joinRestrictionContext)
{
JoinRestriction *joinRestriction = NULL;
foreach_ptr(joinRestriction, joinRestrictionContext->joinRestrictionList)
{
if (joinRestriction->plannerInfo && joinRestriction->plannerInfo->hasLateralRTEs)
{
return true;
}
}
return false;
}
/*
* JoinTreeContainsSubquery returns true if the input query contains any subqueries
* in the join tree (e.g., FROM clause).

View File

@ -83,15 +83,22 @@ typedef struct JoinOrderNode
} JoinOrderNode;
/*
* JoinTypeContext stores jointype between given rangetable indexes
*/
typedef struct JoinTypeContext
/* JoinInfoContext stores joinInfo list and base qualifications */
typedef struct JoinInfoContext
{
List *baseQualifierList;
List *joinInfoList;
} JoinInfoContext;
/* JoinInfoContext stores joinInfo list and base qualifications */
typedef struct JoinInfo
{
JoinType joinType;
uint32 ltableIdx;
uint32 rtableIdx;
JoinType *joinType;
} JoinTypeContext;
List *joinQualifierList;
} JoinInfo;
/* Config variables managed via guc.c */
@ -102,9 +109,8 @@ extern bool EnableSingleHashRepartitioning;
/* Function declaration for determining table join orders */
extern List * JoinExprList(FromExpr *fromExpr);
extern List * JoinOrderList(List *rangeTableEntryList, List *joinClauseList);
extern List * FixedJoinOrderList(List *rangeTableEntryList, List *joinClauseList,
List *joinExprList);
extern const char * JoinTypeName(JoinType jointype);
extern List * FixedJoinOrderList(List *rangeTableEntryList,
JoinInfoContext *joinInfoContext);
extern bool IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId,
Node *joinClause);
extern List * ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId,
@ -122,6 +128,7 @@ extern Var * DistPartitionKey(Oid relationId);
extern Var * DistPartitionKeyOrError(Oid relationId);
extern char PartitionMethod(Oid relationId);
extern char TableReplicationModel(Oid relationId);
extern bool ExtractLeftMostRangeTableIndex(Node *node, int *rangeTableIndex);
#endif /* MULTI_JOIN_ORDER_H */

View File

@ -1242,19 +1242,19 @@ WHERE o_orderkey IN (1, 2)
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 3
Tasks Shown: One of 3
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Nested Loop
Join Filter: (orders_hash_partitioned.o_orderkey = lineitem_hash_partitioned.l_orderkey)
-> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned
-> Seq Scan on orders_hash_partitioned_630003 orders_hash_partitioned
Filter: (o_orderkey = ANY ('{1,2}'::integer[]))
-> Materialize
-> Bitmap Heap Scan on lineitem_hash_partitioned_630004 lineitem_hash_partitioned
-> Bitmap Heap Scan on lineitem_hash_partitioned_630007 lineitem_hash_partitioned
Recheck Cond: (l_orderkey = ANY ('{2,3}'::integer[]))
-> Bitmap Index Scan on lineitem_hash_partitioned_pkey_630004
-> Bitmap Index Scan on lineitem_hash_partitioned_pkey_630007
Index Cond: (l_orderkey = ANY ('{2,3}'::integer[]))
(16 rows)

View File

@ -845,7 +845,6 @@ SET client_min_messages TO WARNING;
raw_events_first.user_id
FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
-- same as the above with INNER JOIN
INSERT INTO agg_events (user_id)
SELECT
@ -867,7 +866,6 @@ DETAIL: Cartesian products are currently unsupported
raw_events_first.user_id
FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
-- same as the above with INNER JOIN
-- we support this with route to coordinator
SELECT coordinator_plan($Q$
@ -903,7 +901,6 @@ FROM
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.user_id = raw_events_second.value_1
WHERE
raw_events_first.user_id = 10;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
-- same as the above with INNER JOIN
-- we support this with route to coordinator
SELECT coordinator_plan($Q$

View File

@ -354,7 +354,8 @@ FROM (
GROUP BY user_id
) AS shard_union
ORDER BY user_lastseen DESC;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
-- not pushable since lateral join is not on the partition key
INSERT INTO agg_results_third (user_id, agg_time, value_2_agg)
SELECT
@ -382,8 +383,8 @@ FROM (
GROUP BY user_id
) AS shard_union
ORDER BY user_lastseen DESC;
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
-- not pushable since lateral join is not on the partition key
INSERT INTO agg_results_third (user_id, agg_time, value_2_agg)
SELECT
@ -411,7 +412,8 @@ FROM (
GROUP BY user_id
) AS shard_union
ORDER BY user_lastseen DESC;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
---------------------------------------------------------------------
---------------------------------------------------------------------
-- Count the number of distinct users_table who are in segment X and Y and Z
@ -515,14 +517,14 @@ SELECT user_id, value_2 FROM users_table WHERE
value_1 = 101
AND value_2 >= 5
AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type=101 AND value_3 > 100 AND user_id!=users_table.user_id);
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- not pushable since the join is not the partition key
INSERT INTO agg_results_third(user_id, value_2_agg)
SELECT user_id, value_2 FROM users_table WHERE
value_1 = 101
AND value_2 >= 5
AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type=101 AND value_3 > 100 AND event_type=users_table.user_id);
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
---------------------------------------------------------------------
---------------------------------------------------------------------
-- Customers who have done X and Y, and satisfy other customer specific criteria

View File

@ -74,7 +74,7 @@ EXPLAIN (COSTS OFF)
SELECT l1.l_quantity FROM lineitem l1, lineitem l2
WHERE l1.l_orderkey = l2.l_orderkey AND l1.l_quantity > 5;
DEBUG: Router planner cannot handle multi-shard select queries
LOG: join order: [ "lineitem" ][ local partition join "lineitem" ]
LOG: join order: [ "lineitem" ][ local partition join(INNER) "lineitem" ]
DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647]
DEBUG: join prunable for intervals [0,2147483647] and [-2147483648,-1]
QUERY PLAN
@ -91,7 +91,7 @@ EXPLAIN (COSTS OFF)
SELECT count(*) FROM lineitem, orders
WHERE (l_orderkey = o_orderkey AND l_quantity > 5)
OR (l_orderkey = o_orderkey AND l_quantity < 10);
LOG: join order: [ "lineitem" ][ local partition join "orders" ]
LOG: join order: [ "lineitem" ][ local partition join(INNER) "orders" ]
QUERY PLAN
---------------------------------------------------------------------
Aggregate
@ -106,7 +106,7 @@ ERROR: complex joins are only supported when all distributed tables are joined
EXPLAIN (COSTS OFF)
SELECT count(*) FROM orders, lineitem_hash
WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ dual partition join "lineitem_hash" ]
LOG: join order: [ "orders" ][ dual partition join(INNER) "lineitem_hash" ]
QUERY PLAN
---------------------------------------------------------------------
Aggregate
@ -118,7 +118,7 @@ LOG: join order: [ "orders" ][ dual partition join "lineitem_hash" ]
EXPLAIN (COSTS OFF)
SELECT count(*) FROM orders_hash, lineitem_hash
WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ]
LOG: join order: [ "orders_hash" ][ local partition join(INNER) "lineitem_hash" ]
QUERY PLAN
---------------------------------------------------------------------
Aggregate
@ -130,7 +130,7 @@ LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ]
EXPLAIN (COSTS OFF)
SELECT count(*) FROM customer_hash, nation
WHERE c_nationkey = n_nationkey;
LOG: join order: [ "customer_hash" ][ reference join "nation" ]
LOG: join order: [ "customer_hash" ][ reference join(INNER) "nation" ]
QUERY PLAN
---------------------------------------------------------------------
Aggregate
@ -143,7 +143,7 @@ LOG: join order: [ "customer_hash" ][ reference join "nation" ]
EXPLAIN (COSTS OFF)
SELECT count(*) FROM orders, lineitem, customer_append
WHERE o_custkey = l_partkey AND o_custkey = c_nationkey;
LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition join "customer_append" ]
LOG: join order: [ "orders" ][ dual partition join(INNER) "lineitem" ][ dual partition join(INNER) "customer_append" ]
QUERY PLAN
---------------------------------------------------------------------
Aggregate
@ -156,7 +156,7 @@ LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition
EXPLAIN (COSTS OFF)
SELECT count(*) FROM orders, customer_hash
WHERE c_custkey = o_custkey;
LOG: join order: [ "orders" ][ dual partition join "customer_hash" ]
LOG: join order: [ "orders" ][ dual partition join(INNER) "customer_hash" ]
QUERY PLAN
---------------------------------------------------------------------
Aggregate
@ -169,7 +169,7 @@ LOG: join order: [ "orders" ][ dual partition join "customer_hash" ]
EXPLAIN (COSTS OFF)
SELECT count(*) FROM orders_hash, customer_append
WHERE c_custkey = o_custkey;
LOG: join order: [ "orders_hash" ][ dual partition join "customer_append" ]
LOG: join order: [ "orders_hash" ][ dual partition join(INNER) "customer_append" ]
QUERY PLAN
---------------------------------------------------------------------
Aggregate
@ -194,7 +194,7 @@ JOIN (
JOIN events_table USING (user_id)
WHERE event_type = 5
) AS some_users ON (some_users.user_id = bar.user_id);
LOG: join order: [ "users_table" ][ local partition join "events_table" ][ local partition join "users_table" ][ local partition join "events_table" ]
LOG: join order: [ "users_table" ][ local partition join(INNER) "events_table" ][ local partition join(INNER) "users_table" ][ local partition join(INNER) "events_table" ]
QUERY PLAN
---------------------------------------------------------------------
Aggregate

View File

@ -53,7 +53,7 @@ GROUP BY
ORDER BY
revenue DESC,
o_orderdate;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partition join "customer_append" ]
LOG: join order: [ "orders" ][ local partition join(INNER) "lineitem" ][ dual partition join(INNER) "customer_append" ]
QUERY PLAN
---------------------------------------------------------------------
Sort
@ -97,7 +97,7 @@ GROUP BY
c_comment
ORDER BY
revenue DESC;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partition join "customer_append" ][ reference join "nation" ]
LOG: join order: [ "orders" ][ local partition join(INNER) "lineitem" ][ dual partition join(INNER) "customer_append" ][ reference join(INNER) "nation" ]
QUERY PLAN
---------------------------------------------------------------------
Sort
@ -137,7 +137,7 @@ WHERE
AND l_shipmode in ('AIR', 'AIR REG', 'TRUCK')
AND l_shipinstruct = 'DELIVER IN PERSON'
);
LOG: join order: [ "lineitem" ][ dual partition join "part_append" ]
LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ]
QUERY PLAN
---------------------------------------------------------------------
Aggregate
@ -157,7 +157,7 @@ WHERE
c_custkey = o_custkey
GROUP BY
l_partkey;
LOG: join order: [ "lineitem" ][ local partition join "orders" ][ dual partition join "part_append" ][ dual partition join "customer_append" ]
LOG: join order: [ "lineitem" ][ local partition join(INNER) "orders" ][ dual partition join(INNER) "part_append" ][ dual partition join(INNER) "customer_append" ]
QUERY PLAN
---------------------------------------------------------------------
HashAggregate

View File

@ -48,7 +48,7 @@ GROUP BY
ORDER BY
revenue DESC,
o_orderdate;
LOG: join order: [ "orders" ][ reference join "customer" ][ local partition join "lineitem" ]
LOG: join order: [ "orders" ][ reference join(INNER) "customer" ][ local partition join(INNER) "lineitem" ]
QUERY PLAN
---------------------------------------------------------------------
Sort
@ -90,7 +90,7 @@ GROUP BY
c_comment
ORDER BY
revenue DESC;
LOG: join order: [ "orders" ][ reference join "customer" ][ reference join "nation" ][ local partition join "lineitem" ]
LOG: join order: [ "orders" ][ reference join(INNER) "customer" ][ reference join(INNER) "nation" ][ local partition join(INNER) "lineitem" ]
QUERY PLAN
---------------------------------------------------------------------
Sort
@ -132,7 +132,7 @@ WHERE
AND l_shipmode in ('AIR', 'AIR REG', 'TRUCK')
AND l_shipinstruct = 'DELIVER IN PERSON'
);
LOG: join order: [ "lineitem" ][ reference join "part" ]
LOG: join order: [ "lineitem" ][ reference join(INNER) "part" ]
QUERY PLAN
---------------------------------------------------------------------
Aggregate

View File

@ -810,7 +810,7 @@ FROM
WHERE
colocated_table_test.value_1 = reference_table_test.value_1
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ]
LOG: join order: [ "colocated_table_test" ][ reference join(INNER) "reference_table_test" ]
value_1
---------------------------------------------------------------------
1
@ -824,7 +824,7 @@ FROM
WHERE
colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ]
LOG: join order: [ "colocated_table_test" ][ reference join(INNER) "reference_table_test" ]
value_2
---------------------------------------------------------------------
1
@ -838,7 +838,7 @@ FROM
WHERE
reference_table_test.value_1 = colocated_table_test.value_1
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ]
LOG: join order: [ "colocated_table_test" ][ reference join(INNER) "reference_table_test" ]
value_2
---------------------------------------------------------------------
1
@ -853,7 +853,7 @@ FROM
WHERE
colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY colocated_table_test.value_2;
LOG: join order: [ "colocated_table_test_2" ][ cartesian product reference join "reference_table_test" ][ dual partition join "colocated_table_test" ]
LOG: join order: [ "colocated_table_test_2" ][ cartesian product reference join(INNER) "reference_table_test" ][ dual partition join(INNER) "colocated_table_test" ]
value_2
---------------------------------------------------------------------
1
@ -870,7 +870,7 @@ FROM
WHERE
colocated_table_test.value_1 = colocated_table_test_2.value_1 AND colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ local partition join "colocated_table_test_2" ]
LOG: join order: [ "colocated_table_test" ][ reference join(INNER) "reference_table_test" ][ local partition join(INNER) "colocated_table_test_2" ]
value_2
---------------------------------------------------------------------
1
@ -885,7 +885,7 @@ FROM
WHERE
colocated_table_test.value_2 = colocated_table_test_2.value_2 AND colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ dual partition join "colocated_table_test_2" ]
LOG: join order: [ "colocated_table_test" ][ reference join(INNER) "reference_table_test" ][ dual partition join(INNER) "colocated_table_test_2" ]
value_2
---------------------------------------------------------------------
1
@ -899,7 +899,7 @@ FROM
WHERE
colocated_table_test.value_1 = reference_table_test.value_1 AND colocated_table_test_2.value_1 = reference_table_test.value_1
ORDER BY 1;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ dual partition join "colocated_table_test_2" ]
LOG: join order: [ "colocated_table_test" ][ reference join(INNER) "reference_table_test" ][ dual partition join(INNER) "colocated_table_test_2" ]
value_2
---------------------------------------------------------------------
1

View File

@ -148,7 +148,7 @@ EXPLAIN (COSTS OFF)
SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol
WHERE repartition_udt.pk > 1;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
LOG: join order: [ "repartition_udt" ][ dual partition join(INNER) "repartition_udt_other" ]
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
@ -166,7 +166,7 @@ SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol
WHERE repartition_udt.pk > 1
ORDER BY repartition_udt.pk;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
LOG: join order: [ "repartition_udt" ][ dual partition join(INNER) "repartition_udt_other" ]
pk | udtcol | txtcol | pk | udtcol | txtcol
---------------------------------------------------------------------
2 | (1,2) | foo | 8 | (1,2) | foo

View File

@ -11,7 +11,7 @@ SET citus.log_multi_join_order = true;
-- join on int column, and be empty
SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.pk = repartition_udt_other.pk;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
LOG: join order: [ "repartition_udt" ][ dual partition join(INNER) "repartition_udt_other" ]
pk | udtcol | txtcol | pk | udtcol | txtcol
---------------------------------------------------------------------
(0 rows)
@ -20,7 +20,7 @@ SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol
WHERE repartition_udt.pk > 1
ORDER BY repartition_udt.pk;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
LOG: join order: [ "repartition_udt" ][ dual partition join(INNER) "repartition_udt_other" ]
pk | udtcol | txtcol | pk | udtcol | txtcol
---------------------------------------------------------------------
2 | (1,2) | foo | 8 | (1,2) | foo

View File

@ -11,7 +11,7 @@ SET citus.log_multi_join_order = true;
-- join on int column, and be empty.
SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.pk = repartition_udt_other.pk;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
LOG: join order: [ "repartition_udt" ][ dual partition join(INNER) "repartition_udt_other" ]
pk | udtcol | txtcol | pk | udtcol | txtcol
---------------------------------------------------------------------
(0 rows)
@ -20,7 +20,7 @@ SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol
WHERE repartition_udt.pk > 1
ORDER BY repartition_udt.pk;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
LOG: join order: [ "repartition_udt" ][ dual partition join(INNER) "repartition_udt_other" ]
pk | udtcol | txtcol | pk | udtcol | txtcol
---------------------------------------------------------------------
2 | (1,2) | foo | 8 | (1,2) | foo

View File

@ -174,7 +174,7 @@ FROM
multi_outer_join_left a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey = 5 or r_custkey > 15;
LOG: join order: [ "multi_outer_join_left" ][ reference join "multi_outer_join_right_reference" ]
LOG: join order: [ "multi_outer_join_left" ][ reference join(INNER) "multi_outer_join_right_reference" ]
min | max
---------------------------------------------------------------------
5 | 5
@ -277,7 +277,7 @@ SELECT
count(*)
FROM
multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_nationkey = r_nationkey);
LOG: join order: [ "multi_outer_join_left" ][ dual partition join "multi_outer_join_right" ]
LOG: join order: [ "multi_outer_join_left" ][ dual partition join(LEFT) "multi_outer_join_right" ]
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- Anti-join should return customers for which there is no row in the right table
@ -312,7 +312,7 @@ FROM
multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey)
WHERE
r_custkey = 21 or r_custkey < 10;
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ]
LOG: join order: [ "multi_outer_join_left" ][ local partition join(INNER) "multi_outer_join_right" ]
min | max
---------------------------------------------------------------------
21 | 21

View File

@ -13,7 +13,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------
@ -41,7 +41,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------
@ -69,7 +69,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------
@ -96,7 +96,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ dual partition join "part_append" ][ cartesian product reference join "supplier" ]
LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ cartesian product reference join(INNER) "supplier" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------
@ -124,7 +124,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------
@ -152,7 +152,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------
@ -180,7 +180,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join "supplier" ][ dual partition join "part_append" ]
LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------
@ -208,7 +208,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ dual partition join "part_append" ][ reference join "supplier" ]
LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ reference join(INNER) "supplier" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------

View File

@ -151,7 +151,7 @@ EXPLAIN (COSTS OFF)
SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol
WHERE repartition_udt.pk > 1;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
LOG: join order: [ "repartition_udt" ][ dual partition join(INNER) "repartition_udt_other" ]
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
@ -169,7 +169,7 @@ SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol
WHERE repartition_udt.pk > 1
ORDER BY repartition_udt.pk;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
LOG: join order: [ "repartition_udt" ][ dual partition join(INNER) "repartition_udt_other" ]
pk | udtcol | txtcol | pk | udtcol | txtcol
---------------------------------------------------------------------
2 | (1,2) | foo | 8 | (1,2) | foo

View File

@ -682,5 +682,5 @@ SELECT user_id, value_2 FROM users_table WHERE
AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type=1 AND value_3 > 1 AND test_join_function(events_table.user_id, users_table.user_id))
ORDER BY 1 DESC, 2 DESC
LIMIT 3;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DROP FUNCTION test_join_function(int,int);

View File

@ -1,3 +1,5 @@
CREATE SCHEMA non_colocated_outer_joins;
SET search_path TO non_colocated_outer_joins;
CREATE TABLE t1(col1 INT, col2 INT);
SELECT create_distributed_table('t1', 'col1');
create_distributed_table
@ -350,28 +352,19 @@ LOG: join order: [ "t2" ][ dual partition join(LEFT) "t1" ][ dual partition joi
| | 15 | 15 | 15 | 15
(5 rows)
-- join order planner cannot handle semi joins
SELECT t1.* FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
SELECT t1.* FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t1.col2 = t2.col2) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
SELECT t2.* FROM t2 WHERE EXISTS (SELECT * FROM t1 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
SELECT t2.* FROM t2 WHERE EXISTS (SELECT * FROM t1 WHERE t1.col2 = t2.col2) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
------------------------- wrong results below, should not be supported if there exists nonsimple join clause
-- join order planner cannot handle anti join between tables with simple join clause
SELECT t1.* FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
SELECT t1.* FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t1.col2 = t2.col2) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
SELECT t2.* FROM t2 WHERE NOT EXISTS (SELECT * FROM t1 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
SELECT t2.* FROM t2 WHERE NOT EXISTS (SELECT * FROM t1 WHERE t1.col2 = t2.col2) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
-- join order planner cannot handle left outer join between tables with nonsimple join clause
-- where constraint(t1.col1 IS NULL or t2.col2 IS NULL) is considered as join constraint
-- join order planner handles left outer join between tables with nonsimple join or where clause
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON (t1.col1 = t2.col1) WHERE (t1.col1 IS NULL or t2.col2 IS NULL) ORDER BY 1,2,3,4;
LOG: join order: [ "t1" ][ dual partition join(LEFT) "t2" ]
col1 | col2 | col1 | col2
---------------------------------------------------------------------
1 | 1 | |
2 | 2 | |
3 | 3 | |
4 | 4 | |
5 | 5 | |
(5 rows)
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON (t1.col1 = t2.col1 and t1.col1 < 0) ORDER BY 1,2,3,4;
LOG: join order: [ "t1" ][ dual partition join(LEFT) "t2" ]
col1 | col2 | col1 | col2
---------------------------------------------------------------------
@ -387,20 +380,29 @@ LOG: join order: [ "t1" ][ dual partition join(LEFT) "t2" ]
10 | 10 | |
(10 rows)
-- join constraint(t1.col1 < 0) is considered as where constraint
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON (t1.col1 = t2.col1 and t1.col1 < 0) ORDER BY 1,2,3,4;
LOG: join order: [ "t1" ][ dual partition join(LEFT) "t2" ]
col1 | col2 | col1 | col2
---------------------------------------------------------------------
1 | 1 | |
2 | 2 | |
3 | 3 | |
4 | 4 | |
5 | 5 | |
6 | 6 | 6 | 6
7 | 7 | 7 | 7
8 | 8 | 8 | 8
9 | 9 | 9 | 9
10 | 10 | 10 | 10
(10 rows)
-- join order planner cannot handle semi joins
SELECT t1.* FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
SELECT t1.* FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t1.col2 = t2.col2) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
SELECT t2.* FROM t2 WHERE EXISTS (SELECT * FROM t1 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
SELECT t2.* FROM t2 WHERE EXISTS (SELECT * FROM t1 WHERE t1.col2 = t2.col2) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
-- join order planner cannot handle anti join
SELECT t1.* FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
SELECT t1.* FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t1.col2 = t2.col2) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
SELECT t2.* FROM t2 WHERE NOT EXISTS (SELECT * FROM t1 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
SELECT t2.* FROM t2 WHERE NOT EXISTS (SELECT * FROM t1 WHERE t1.col2 = t2.col2) ORDER BY 1,2;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DROP SCHEMA non_colocated_outer_joins CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table t1
drop cascades to table t2
drop cascades to table t3
RESET client_min_messages;
RESET citus.log_multi_join_order;
RESET citus.enable_repartition_joins;

View File

@ -385,14 +385,15 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
18
(1 row)
-- supported by join order planner
-- not supported because we join t3 (inner rel of the anti join) with a column
-- of reference table, not with the distribution column of the other distributed
-- table (t2)
SELECT COUNT(*) FROM
ref_1 t1
JOIN dist_1 t2
ON (t1.a = t2.a)
WHERE NOT EXISTS (SELECT * FROM dist_1 t3 WHERE t1.a = a);
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- supported because the semi join is performed based on distribution keys
-- of the distributed tables
SELECT COUNT(*) FROM

View File

@ -201,6 +201,7 @@ test: local_table_join
test: local_dist_join_mixed
test: citus_local_dist_joins
test: recurring_outer_join
test: non_colocated_outer_joins
test: pg_dump
# ---------

View File

@ -1,3 +1,6 @@
CREATE SCHEMA non_colocated_outer_joins;
SET search_path TO non_colocated_outer_joins;
CREATE TABLE t1(col1 INT, col2 INT);
SELECT create_distributed_table('t1', 'col1');
INSERT INTO t1 SELECT i, i FROM generate_series(1,10) i;
@ -68,6 +71,11 @@ SELECT t1.*, t2.*, t3.* FROM t1 FULL JOIN t2 ON (t1.col2 = t2.col2) INNER JOIN t
SELECT t1.*, t2.*, t3.* FROM t1 FULL JOIN t2 ON (t1.col2 = t2.col1) INNER JOIN t3 ON (t3.col2 = t1.col1) ORDER BY 1,2,3,4,5,6;
SELECT t1.*, t2.*, t3.* FROM t1 FULL JOIN t2 ON (t1.col2 = t2.col1) INNER JOIN t3 ON (t3.col2 = t2.col1) ORDER BY 1,2,3,4,5,6;
-- join order planner handles left outer join between tables with nonsimple join or where clause
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON (t1.col1 = t2.col1) WHERE (t1.col1 IS NULL or t2.col2 IS NULL) ORDER BY 1,2,3,4;
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON (t1.col1 = t2.col1 and t1.col1 < 0) ORDER BY 1,2,3,4;
-- join order planner cannot handle semi joins
SELECT t1.* FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
@ -75,19 +83,14 @@ SELECT t1.* FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t1.col2 = t2.col2) ORDE
SELECT t2.* FROM t2 WHERE EXISTS (SELECT * FROM t1 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
SELECT t2.* FROM t2 WHERE EXISTS (SELECT * FROM t1 WHERE t1.col2 = t2.col2) ORDER BY 1,2;
------------------------- wrong results below, should not be supported if there exists nonsimple join clause
-- join order planner cannot handle anti join between tables with simple join clause
-- join order planner cannot handle anti join
SELECT t1.* FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
SELECT t1.* FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t1.col2 = t2.col2) ORDER BY 1,2;
SELECT t2.* FROM t2 WHERE NOT EXISTS (SELECT * FROM t1 WHERE t1.col1 = t2.col1) ORDER BY 1,2;
SELECT t2.* FROM t2 WHERE NOT EXISTS (SELECT * FROM t1 WHERE t1.col2 = t2.col2) ORDER BY 1,2;
-- join order planner cannot handle left outer join between tables with nonsimple join clause
-- where constraint(t1.col1 IS NULL or t2.col2 IS NULL) is considered as join constraint
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON (t1.col1 = t2.col1) WHERE (t1.col1 IS NULL or t2.col2 IS NULL) ORDER BY 1,2,3,4;
-- join constraint(t1.col1 < 0) is considered as where constraint
SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON (t1.col1 = t2.col1 and t1.col1 < 0) ORDER BY 1,2,3,4;
DROP SCHEMA non_colocated_outer_joins CASCADE;
RESET client_min_messages;
RESET citus.log_multi_join_order;
RESET citus.enable_repartition_joins;

View File

@ -160,7 +160,9 @@ SELECT COUNT(*) FROM ref_1 LEFT JOIN (dist_1 t1 LEFT JOIN dist_1 t2 USING (a)) q
ON (t1.a = t2.a)
WHERE t1.a IN (SELECT a FROM dist_1 t3);
-- supported by join order planner
-- not supported because we join t3 (inner rel of the anti join) with a column
-- of reference table, not with the distribution column of the other distributed
-- table (t2)
SELECT COUNT(*) FROM
ref_1 t1
JOIN dist_1 t2