mirror of https://github.com/citusdata/citus.git
LocalJoin
parent
3797092ae4
commit
9775b96afd
|
@ -31,6 +31,11 @@
|
|||
- [ ] PostprocessAlterTableStmtAttachPartition()
|
||||
- [x] TargetListOnPartitionColumn()
|
||||
- [ ] PartitionColumnForPushedDownSubquery()
|
||||
- [ ] CoPartitionedTables()
|
||||
- [x] LocalJoin()
|
||||
- [ ] SinglePartitionJoin()
|
||||
- [ ] ApplySinglePartitionJoin()
|
||||
- [ ] MultiJoinTree()
|
||||
|
||||
# query pushdown planner
|
||||
- [x] RestrictionEquivalenceForPartitionKeys()
|
||||
|
|
|
@ -97,7 +97,8 @@ static JoinOrderNode * CartesianProduct(JoinOrderNode *joinNode,
|
|||
JoinType joinType);
|
||||
static JoinOrderNode * MakeJoinOrderNode(TableEntry *tableEntry,
|
||||
JoinRuleType joinRuleType,
|
||||
List *partitionColumnList, char partitionMethod,
|
||||
List *partitionColumnListList,
|
||||
char partitionMethod,
|
||||
TableEntry *anchorTable);
|
||||
|
||||
|
||||
|
@ -343,11 +344,18 @@ JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, List *joinClause
|
|||
/* create join node for the first table */
|
||||
Oid firstRelationId = firstTable->relationId;
|
||||
uint32 firstTableId = firstTable->rangeTableId;
|
||||
Var *firstPartitionColumn = PartitionColumn(firstRelationId, firstTableId);
|
||||
List *firstPartitionColumnListList = NIL;
|
||||
Var *partitionColumn = NULL;
|
||||
List *partitionColumnList = PartitionColumns(firstRelationId, firstTableId);
|
||||
foreach_ptr(partitionColumn, partitionColumnList)
|
||||
{
|
||||
firstPartitionColumnListList = lappend(
|
||||
firstPartitionColumnListList, list_make1(partitionColumn));
|
||||
}
|
||||
char firstPartitionMethod = PartitionMethod(firstRelationId);
|
||||
|
||||
JoinOrderNode *firstJoinNode = MakeJoinOrderNode(firstTable, firstJoinRule,
|
||||
list_make1(firstPartitionColumn),
|
||||
firstPartitionColumnListList,
|
||||
firstPartitionMethod,
|
||||
firstTable);
|
||||
|
||||
|
@ -829,7 +837,7 @@ ReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
return NULL;
|
||||
}
|
||||
return MakeJoinOrderNode(candidateTable, REFERENCE_JOIN,
|
||||
currentJoinNode->partitionColumnList,
|
||||
currentJoinNode->partitionColumnListList,
|
||||
currentJoinNode->partitionMethod,
|
||||
currentJoinNode->anchorTable);
|
||||
}
|
||||
|
@ -881,7 +889,7 @@ CartesianProductReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candid
|
|||
return NULL;
|
||||
}
|
||||
return MakeJoinOrderNode(candidateTable, CARTESIAN_PRODUCT_REFERENCE_JOIN,
|
||||
currentJoinNode->partitionColumnList,
|
||||
currentJoinNode->partitionColumnListList,
|
||||
currentJoinNode->partitionMethod,
|
||||
currentJoinNode->anchorTable);
|
||||
}
|
||||
|
@ -905,8 +913,8 @@ LocalJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
{
|
||||
Oid relationId = candidateTable->relationId;
|
||||
uint32 tableId = candidateTable->rangeTableId;
|
||||
Var *candidatePartitionColumn = PartitionColumn(relationId, tableId);
|
||||
List *currentPartitionColumnList = currentJoinNode->partitionColumnList;
|
||||
List *candidatePartitionColumnList = PartitionColumns(relationId, tableId);
|
||||
List *currentPartitionColumnListList = currentJoinNode->partitionColumnListList;
|
||||
char candidatePartitionMethod = PartitionMethod(relationId);
|
||||
char currentPartitionMethod = currentJoinNode->partitionMethod;
|
||||
TableEntry *currentAnchorTable = currentJoinNode->anchorTable;
|
||||
|
@ -926,14 +934,40 @@ LocalJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
return NULL;
|
||||
}
|
||||
|
||||
bool joinOnPartitionColumns = JoinOnColumns(currentPartitionColumnList,
|
||||
candidatePartitionColumn,
|
||||
applicableJoinClauses);
|
||||
if (!joinOnPartitionColumns)
|
||||
/*
|
||||
* Reference tables cannot do a local join on the distribution column, they
|
||||
* do reference joins.
|
||||
*/
|
||||
if (list_length(candidatePartitionColumnList) == 0 ||
|
||||
list_length(currentPartitionColumnListList) == 0)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* If the number of partition columns don't match then we cannot do a local
|
||||
* join.
|
||||
*/
|
||||
if (list_length(candidatePartitionColumnList) != list_length(
|
||||
currentPartitionColumnListList))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Var *candidatePartitionColumn = NULL;
|
||||
List *currentPartitionColumnList = NIL;
|
||||
forboth_ptr(candidatePartitionColumn, candidatePartitionColumnList,
|
||||
currentPartitionColumnList, currentPartitionColumnListList)
|
||||
{
|
||||
bool joinOnPartitionColumns = JoinOnColumns(currentPartitionColumnList,
|
||||
candidatePartitionColumn,
|
||||
applicableJoinClauses);
|
||||
if (!joinOnPartitionColumns)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* shard interval lists must have 1-1 matching for local joins */
|
||||
bool coPartitionedTables = CoPartitionedTables(currentAnchorTable->relationId,
|
||||
relationId);
|
||||
|
@ -949,11 +983,20 @@ LocalJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
* subsequent joins colocated with this candidate table to correctly be recognized as
|
||||
* a local join as well.
|
||||
*/
|
||||
currentPartitionColumnList = list_append_unique(currentPartitionColumnList,
|
||||
candidatePartitionColumn);
|
||||
ListCell *candidatePartitionColumnCell = NULL;
|
||||
ListCell *currentPartitionColumnListCell = NULL;
|
||||
forboth(candidatePartitionColumnCell, candidatePartitionColumnList,
|
||||
currentPartitionColumnListCell, currentPartitionColumnListList)
|
||||
{
|
||||
candidatePartitionColumn = lfirst(candidatePartitionColumnCell);
|
||||
currentPartitionColumnList = lfirst(currentPartitionColumnListCell);
|
||||
lfirst(currentPartitionColumnListCell) =
|
||||
list_append_unique(currentPartitionColumnList,
|
||||
candidatePartitionColumn);
|
||||
}
|
||||
|
||||
JoinOrderNode *nextJoinNode = MakeJoinOrderNode(candidateTable, LOCAL_PARTITION_JOIN,
|
||||
currentPartitionColumnList,
|
||||
currentPartitionColumnListList,
|
||||
currentPartitionMethod,
|
||||
currentAnchorTable);
|
||||
|
||||
|
@ -973,7 +1016,7 @@ static JoinOrderNode *
|
|||
SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
||||
List *applicableJoinClauses, JoinType joinType)
|
||||
{
|
||||
List *currentPartitionColumnList = currentJoinNode->partitionColumnList;
|
||||
List *currentPartitionColumnListList = currentJoinNode->partitionColumnListList;
|
||||
char currentPartitionMethod = currentJoinNode->partitionMethod;
|
||||
TableEntry *currentAnchorTable = currentJoinNode->anchorTable;
|
||||
JoinRuleType currentJoinRuleType = currentJoinNode->joinRuleType;
|
||||
|
@ -981,8 +1024,6 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
|
||||
Oid relationId = candidateTable->relationId;
|
||||
uint32 tableId = candidateTable->rangeTableId;
|
||||
Var *candidatePartitionColumn = PartitionColumn(relationId, tableId);
|
||||
char candidatePartitionMethod = PartitionMethod(relationId);
|
||||
|
||||
/* outer joins are not supported yet */
|
||||
if (IS_OUTER_JOIN(joinType))
|
||||
|
@ -1000,8 +1041,17 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (list_length(currentPartitionColumnListList) > 1)
|
||||
{
|
||||
/*
|
||||
* TODO: Implement single partition join for multi column distributed
|
||||
* tables.
|
||||
*/
|
||||
return NULL;
|
||||
}
|
||||
OpExpr *joinClause =
|
||||
SinglePartitionJoinClause(currentPartitionColumnList, applicableJoinClauses);
|
||||
SinglePartitionJoinClause(currentPartitionColumnListList,
|
||||
applicableJoinClauses);
|
||||
if (joinClause != NULL)
|
||||
{
|
||||
if (currentPartitionMethod == DISTRIBUTE_BY_HASH)
|
||||
|
@ -1016,29 +1066,41 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
}
|
||||
|
||||
return MakeJoinOrderNode(candidateTable, SINGLE_HASH_PARTITION_JOIN,
|
||||
currentPartitionColumnList,
|
||||
currentPartitionColumnListList,
|
||||
currentPartitionMethod,
|
||||
currentAnchorTable);
|
||||
}
|
||||
else
|
||||
{
|
||||
return MakeJoinOrderNode(candidateTable, SINGLE_RANGE_PARTITION_JOIN,
|
||||
currentPartitionColumnList,
|
||||
currentPartitionColumnListList,
|
||||
currentPartitionMethod,
|
||||
currentAnchorTable);
|
||||
}
|
||||
}
|
||||
|
||||
char candidatePartitionMethod = PartitionMethod(relationId);
|
||||
|
||||
/* evaluate re-partitioning the current table only if the rule didn't apply above */
|
||||
if (candidatePartitionMethod != DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
List *candidatePartitionColumnList = PartitionColumns(relationId, tableId);
|
||||
if (list_length(currentPartitionColumnListList) > 1)
|
||||
{
|
||||
/*
|
||||
* TODO: Implement single partition join for multi column distributed
|
||||
* tables.
|
||||
*/
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a new unique list (set) with the partition column of the candidate table
|
||||
* to check if a single repartition join will work for this table. When it works
|
||||
* the set is retained on the MultiJoinNode for later local join verification.
|
||||
*/
|
||||
List *candidatePartitionColumnList = list_make1(candidatePartitionColumn);
|
||||
joinClause = SinglePartitionJoinClause(candidatePartitionColumnList,
|
||||
List *candidatePartitionColumnListList = list_make1(candidatePartitionColumnList);
|
||||
joinClause = SinglePartitionJoinClause(candidatePartitionColumnListList,
|
||||
applicableJoinClauses);
|
||||
if (joinClause != NULL)
|
||||
{
|
||||
|
@ -1055,7 +1117,7 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
|
||||
return MakeJoinOrderNode(candidateTable,
|
||||
SINGLE_HASH_PARTITION_JOIN,
|
||||
candidatePartitionColumnList,
|
||||
candidatePartitionColumnListList,
|
||||
candidatePartitionMethod,
|
||||
candidateTable);
|
||||
}
|
||||
|
@ -1063,7 +1125,7 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
{
|
||||
return MakeJoinOrderNode(candidateTable,
|
||||
SINGLE_RANGE_PARTITION_JOIN,
|
||||
candidatePartitionColumnList,
|
||||
candidatePartitionColumnListList,
|
||||
candidatePartitionMethod,
|
||||
candidateTable);
|
||||
}
|
||||
|
@ -1080,13 +1142,19 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
* clause exists, the function returns NULL.
|
||||
*/
|
||||
OpExpr *
|
||||
SinglePartitionJoinClause(List *partitionColumnList, List *applicableJoinClauses)
|
||||
SinglePartitionJoinClause(List *partitionColumnListList, List *applicableJoinClauses)
|
||||
{
|
||||
if (list_length(partitionColumnList) == 0)
|
||||
if (list_length(partitionColumnListList) == 0)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO: Support multi column distributed tables.
|
||||
*/
|
||||
Assert(list_length(partitionColumnListList) == 1);
|
||||
List *partitionColumnList = linitial(partitionColumnListList);
|
||||
|
||||
Var *partitionColumn = NULL;
|
||||
foreach_ptr(partitionColumn, partitionColumnList)
|
||||
{
|
||||
|
@ -1210,7 +1278,7 @@ CartesianProduct(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
{
|
||||
/* Because of the cartesian product, anchor table information got lost */
|
||||
return MakeJoinOrderNode(candidateTable, CARTESIAN_PRODUCT,
|
||||
currentJoinNode->partitionColumnList,
|
||||
currentJoinNode->partitionColumnListList,
|
||||
currentJoinNode->partitionMethod,
|
||||
NULL);
|
||||
}
|
||||
|
@ -1222,14 +1290,14 @@ CartesianProduct(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
|||
/* Constructs and returns a join-order node with the given arguments */
|
||||
JoinOrderNode *
|
||||
MakeJoinOrderNode(TableEntry *tableEntry, JoinRuleType joinRuleType,
|
||||
List *partitionColumnList, char partitionMethod,
|
||||
List *partitionColumnListList, char partitionMethod,
|
||||
TableEntry *anchorTable)
|
||||
{
|
||||
JoinOrderNode *joinOrderNode = palloc0(sizeof(JoinOrderNode));
|
||||
joinOrderNode->tableEntry = tableEntry;
|
||||
joinOrderNode->joinRuleType = joinRuleType;
|
||||
joinOrderNode->joinType = JOIN_INNER;
|
||||
joinOrderNode->partitionColumnList = partitionColumnList;
|
||||
joinOrderNode->partitionColumnListList = partitionColumnListList;
|
||||
joinOrderNode->partitionMethod = partitionMethod;
|
||||
joinOrderNode->joinClauseList = NIL;
|
||||
joinOrderNode->anchorTable = anchorTable;
|
||||
|
|
|
@ -62,7 +62,7 @@ typedef struct QualifierWalkerContext
|
|||
|
||||
/* Function pointer type definition for apply join rule functions */
|
||||
typedef MultiNode *(*RuleApplyFunction) (MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList, JoinType joinType,
|
||||
List *joinClauses);
|
||||
|
||||
typedef bool (*CheckNodeFunc)(Node *);
|
||||
|
@ -94,38 +94,40 @@ static bool IsSelectClause(Node *clause);
|
|||
|
||||
/* Local functions forward declarations for applying joins */
|
||||
static MultiNode * ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode,
|
||||
JoinRuleType ruleType, List *partitionColumnList,
|
||||
JoinRuleType ruleType, List *partitionColumnListList,
|
||||
JoinType joinType, List *joinClauseList);
|
||||
static RuleApplyFunction JoinRuleApplyFunction(JoinRuleType ruleType);
|
||||
static MultiNode * ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList, JoinType joinType,
|
||||
List *joinClauses);
|
||||
static MultiNode * ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList, JoinType joinType,
|
||||
List *joinClauses);
|
||||
static MultiNode * ApplySingleRangePartitionJoin(MultiNode *leftNode,
|
||||
MultiNode *rightNode,
|
||||
List *partitionColumnList,
|
||||
List *partitionColumnListList,
|
||||
JoinType joinType,
|
||||
List *applicableJoinClauses);
|
||||
static MultiNode * ApplySingleHashPartitionJoin(MultiNode *leftNode,
|
||||
MultiNode *rightNode,
|
||||
List *partitionColumnList,
|
||||
List *partitionColumnListList,
|
||||
JoinType joinType,
|
||||
List *applicableJoinClauses);
|
||||
static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList,
|
||||
JoinType joinType,
|
||||
List *joinClauses);
|
||||
static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList,
|
||||
JoinType joinType,
|
||||
List *joinClauses);
|
||||
static MultiNode * ApplyCartesianProductReferenceJoin(MultiNode *leftNode,
|
||||
MultiNode *rightNode,
|
||||
List *partitionColumnList,
|
||||
List *partitionColumnListList,
|
||||
JoinType joinType,
|
||||
List *joinClauses);
|
||||
static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList, JoinType joinType,
|
||||
List *joinClauses);
|
||||
|
||||
|
||||
|
@ -1663,7 +1665,7 @@ MultiJoinTree(List *joinOrderList, List *collectTableList, List *joinWhereClause
|
|||
{
|
||||
JoinRuleType joinRuleType = joinOrderNode->joinRuleType;
|
||||
JoinType joinType = joinOrderNode->joinType;
|
||||
List *partitionColumnList = joinOrderNode->partitionColumnList;
|
||||
List *partitionColumnListList = joinOrderNode->partitionColumnListList;
|
||||
List *joinClauseList = joinOrderNode->joinClauseList;
|
||||
|
||||
/*
|
||||
|
@ -1672,7 +1674,8 @@ MultiJoinTree(List *joinOrderList, List *collectTableList, List *joinWhereClause
|
|||
*/
|
||||
MultiNode *newJoinNode = ApplyJoinRule(currentTopNode,
|
||||
(MultiNode *) collectNode,
|
||||
joinRuleType, partitionColumnList,
|
||||
joinRuleType,
|
||||
partitionColumnListList,
|
||||
joinType,
|
||||
joinClauseList);
|
||||
|
||||
|
@ -2025,7 +2028,7 @@ pull_var_clause_default(Node *node)
|
|||
*/
|
||||
static MultiNode *
|
||||
ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, JoinRuleType ruleType,
|
||||
List *partitionColumnList, JoinType joinType, List *joinClauseList)
|
||||
List *partitionColumnListList, JoinType joinType, List *joinClauseList)
|
||||
{
|
||||
List *leftTableIdList = OutputTableIdList(leftNode);
|
||||
List *rightTableIdList = OutputTableIdList(rightNode);
|
||||
|
@ -2041,7 +2044,8 @@ ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, JoinRuleType ruleType,
|
|||
|
||||
/* call the join rule application function to create the new join node */
|
||||
RuleApplyFunction ruleApplyFunction = JoinRuleApplyFunction(ruleType);
|
||||
MultiNode *multiNode = (*ruleApplyFunction)(leftNode, rightNode, partitionColumnList,
|
||||
MultiNode *multiNode = (*ruleApplyFunction)(leftNode, rightNode,
|
||||
partitionColumnListList,
|
||||
joinType, applicableJoinClauses);
|
||||
|
||||
if (joinType != JOIN_INNER && CitusIsA(multiNode, MultiJoin))
|
||||
|
@ -2096,7 +2100,7 @@ JoinRuleApplyFunction(JoinRuleType ruleType)
|
|||
*/
|
||||
static MultiNode *
|
||||
ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList, JoinType joinType,
|
||||
List *applicableJoinClauses)
|
||||
{
|
||||
MultiJoin *joinNode = CitusMakeNode(MultiJoin);
|
||||
|
@ -2118,7 +2122,7 @@ ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
|
|||
*/
|
||||
static MultiNode *
|
||||
ApplyCartesianProductReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList, JoinType joinType,
|
||||
List *applicableJoinClauses)
|
||||
{
|
||||
MultiJoin *joinNode = CitusMakeNode(MultiJoin);
|
||||
|
@ -2139,7 +2143,7 @@ ApplyCartesianProductReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
|
|||
*/
|
||||
static MultiNode *
|
||||
ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList, JoinType joinType,
|
||||
List *applicableJoinClauses)
|
||||
{
|
||||
MultiJoin *joinNode = CitusMakeNode(MultiJoin);
|
||||
|
@ -2160,11 +2164,11 @@ ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode,
|
|||
*/
|
||||
static MultiNode *
|
||||
ApplySingleRangePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList, JoinType joinType,
|
||||
List *applicableJoinClauses)
|
||||
{
|
||||
MultiJoin *joinNode =
|
||||
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumnList, joinType,
|
||||
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumnListList, joinType,
|
||||
applicableJoinClauses);
|
||||
|
||||
joinNode->joinRuleType = SINGLE_RANGE_PARTITION_JOIN;
|
||||
|
@ -2179,11 +2183,11 @@ ApplySingleRangePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
|
|||
*/
|
||||
static MultiNode *
|
||||
ApplySingleHashPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList, JoinType joinType,
|
||||
List *applicableJoinClauses)
|
||||
{
|
||||
MultiJoin *joinNode =
|
||||
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumnList, joinType,
|
||||
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumnListList, joinType,
|
||||
applicableJoinClauses);
|
||||
|
||||
joinNode->joinRuleType = SINGLE_HASH_PARTITION_JOIN;
|
||||
|
@ -2199,10 +2203,11 @@ ApplySingleHashPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
|
|||
*/
|
||||
static MultiJoin *
|
||||
ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList, JoinType joinType,
|
||||
List *applicableJoinClauses)
|
||||
{
|
||||
Var *partitionColumn = linitial(partitionColumnList);
|
||||
Assert(list_length(partitionColumnListList) == 1);
|
||||
Var *partitionColumn = linitial(linitial(partitionColumnListList));
|
||||
uint32 partitionTableId = partitionColumn->varno;
|
||||
|
||||
/* create all operator structures up front */
|
||||
|
@ -2215,7 +2220,7 @@ ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
|
|||
* column against the join clause's columns. If one of the columns matches,
|
||||
* we introduce a (re-)partition operator for the other column.
|
||||
*/
|
||||
OpExpr *joinClause = SinglePartitionJoinClause(partitionColumnList,
|
||||
OpExpr *joinClause = SinglePartitionJoinClause(partitionColumnListList,
|
||||
applicableJoinClauses);
|
||||
Assert(joinClause != NULL);
|
||||
|
||||
|
@ -2339,7 +2344,7 @@ ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
|
|||
/* Creates a cartesian product node that joins the left and the right node. */
|
||||
static MultiNode *
|
||||
ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode,
|
||||
List *partitionColumnList, JoinType joinType,
|
||||
List *partitionColumnListList, JoinType joinType,
|
||||
List *applicableJoinClauses)
|
||||
{
|
||||
MultiCartesianProduct *cartesianNode = CitusMakeNode(MultiCartesianProduct);
|
||||
|
|
|
@ -54,7 +54,6 @@ typedef struct ListCellAndListWrapper
|
|||
(((var) = lfirst(var ## CellDoNotUse)) || true); \
|
||||
var ## CellDoNotUse = lnext_compat(l, var ## CellDoNotUse))
|
||||
|
||||
|
||||
/*
|
||||
* foreach_int -
|
||||
* a convenience macro which loops through an int list without needing a
|
||||
|
@ -80,6 +79,35 @@ typedef struct ListCellAndListWrapper
|
|||
(((var) = lfirst_oid(var ## CellDoNotUse)) || true); \
|
||||
var ## CellDoNotUse = lnext_compat(l, var ## CellDoNotUse))
|
||||
|
||||
/*
|
||||
* forboth_ptr -
|
||||
* a convenience macro which loops through a pointer list without needing a
|
||||
* ListCell, just a declared pointer variable to store the pointer of the
|
||||
* cell in.
|
||||
*
|
||||
* How it works:
|
||||
* - A ListCell is declared with the name {var}CellDoNotUse and used
|
||||
* throughout the for loop using ## to concat.
|
||||
* - To assign to var it needs to be done in the condition of the for loop,
|
||||
* because we cannot use the initializer since a ListCell* variable is
|
||||
* declared there.
|
||||
* - || true is used to always enter the loop when cell is not null even if
|
||||
* var is NULL.
|
||||
*/
|
||||
#define forboth_ptr(var1, l1, var2, l2) \
|
||||
for (ListCell \
|
||||
*(var1 ## CellDoNotUse) = list_head(l1) \
|
||||
, *(var2 ## CellDoNotUse) = list_head(l2) \
|
||||
; \
|
||||
(var1 ## CellDoNotUse) != NULL \
|
||||
&& (((var1) = lfirst(var1 ## CellDoNotUse)) || true) \
|
||||
&& (var2 ## CellDoNotUse) != NULL \
|
||||
&& (((var2) = lfirst(var2 ## CellDoNotUse)) || true) \
|
||||
; \
|
||||
var1 ## CellDoNotUse = lnext_compat(l1, var1 ## CellDoNotUse) \
|
||||
, var2 ## CellDoNotUse = lnext_compat(l2, var2 ## CellDoNotUse) \
|
||||
)
|
||||
|
||||
/*
|
||||
* foreach_ptr_append -
|
||||
* a convenience macro which loops through a pointer List and can append list
|
||||
|
|
|
@ -75,7 +75,7 @@ typedef struct JoinOrderNode
|
|||
* We keep track of all unique partition columns in the relation to correctly find
|
||||
* join clauses that can be applied locally.
|
||||
*/
|
||||
List *partitionColumnList;
|
||||
List *partitionColumnListList;
|
||||
|
||||
char partitionMethod;
|
||||
List *joinClauseList; /* not relevant for the first table */
|
||||
|
|
|
@ -252,51 +252,16 @@ SELECT * FROM (
|
|||
FROM t2 JOIN t3 ON t2.id = t3.id
|
||||
) foo
|
||||
ORDER BY 1, 2, 3, 4;
|
||||
id | id2 | a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 | 1 | 1
|
||||
1 | 1 | 2 | 1
|
||||
1 | 1 | 4 | 1
|
||||
2 | 3 | 4 | 2
|
||||
2 | 3 | 4 | 4
|
||||
2 | 3 | 5 | 2
|
||||
2 | 3 | 5 | 4
|
||||
2 | 4 | 5 | 2
|
||||
2 | 4 | 5 | 4
|
||||
(9 rows)
|
||||
|
||||
ERROR: the query contains a join that requires repartitioning
|
||||
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
|
||||
EXPLAIN
|
||||
SELECT * FROM (
|
||||
SELECT t2.id, t2.id2, a, b
|
||||
FROM t2 JOIN t3 ON t2.id = t3.id
|
||||
) foo
|
||||
ORDER BY 1, 2, 3, 4;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
|
||||
-> Distributed Subplan XXX_1
|
||||
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=16)
|
||||
Task Count: 4
|
||||
Tasks Shown: One of 4
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Merge Join (cost=285.08..607.40 rows=20808 width=16)
|
||||
Merge Cond: (t2.id = t3.id)
|
||||
-> Sort (cost=142.54..147.64 rows=2040 width=12)
|
||||
Sort Key: t2.id
|
||||
-> Seq Scan on t2_27905504 t2 (cost=0.00..30.40 rows=2040 width=12)
|
||||
-> Sort (cost=142.54..147.64 rows=2040 width=8)
|
||||
Sort Key: t3.id
|
||||
-> Seq Scan on t3_27905508 t3 (cost=0.00..30.40 rows=2040 width=8)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Sort (cost=59.83..62.33 rows=1000 width=16)
|
||||
Sort Key: intermediate_result.id, intermediate_result.id2, intermediate_result.a, intermediate_result.b
|
||||
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=16)
|
||||
(22 rows)
|
||||
|
||||
ERROR: the query contains a join that requires repartitioning
|
||||
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
|
||||
-- Cannot pushdown if not joining on both distribution columns
|
||||
SELECT * FROM (
|
||||
SELECT t2.id, t2.id2, a, b
|
||||
|
|
Loading…
Reference in New Issue