use planner restriction context

* to fetch join and base restrictions properly,
  * to push down restrictions properly.
outer-join-noncolocated-dist-tables
aykutbozkurt 2023-01-16 20:35:37 +03:00
parent 454857f3a7
commit 222f11e23e
19 changed files with 720 additions and 327 deletions

View File

@ -59,6 +59,7 @@
#include "optimizer/optimizer.h"
#include "optimizer/plancat.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/planner.h"
#include "optimizer/planmain.h"
#include "utils/builtins.h"
@ -126,7 +127,8 @@ static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Node *distributionKeyValue);
Node *distributionKeyValue, int
fastPathRelId);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter);
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
@ -144,6 +146,7 @@ distributed_planner(Query *parse,
bool needsDistributedPlanning = false;
bool fastPathRouterQuery = false;
Node *distributionKeyValue = NULL;
int fastPathRelId = InvalidOid;
List *rangeTableList = ExtractRangeTableEntryList(parse);
@ -247,7 +250,11 @@ distributed_planner(Query *parse,
{
if (fastPathRouterQuery)
{
result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue);
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(parse->rtable);
fastPathRelId = rangeTableEntry->relid;
result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue,
fastPathRelId);
}
else
{
@ -686,7 +693,7 @@ IsUpdateOrDelete(Query *query)
*/
static PlannedStmt *
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Node *distributionKeyValue)
Node *distributionKeyValue, int fastPathRelId)
{
FastPathRestrictionContext *fastPathContext =
planContext->plannerRestrictionContext->fastPathRestrictionContext;
@ -694,6 +701,9 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
planContext->plannerRestrictionContext->fastPathRestrictionContext->
fastPathRouterQuery = true;
planContext->plannerRestrictionContext->fastPathRestrictionContext->
distRelId = fastPathRelId;
if (distributionKeyValue == NULL)
{
/* nothing to record */
@ -710,6 +720,8 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
planContext->boundParams);
RelabelPlannerRestrictionContext(planContext->plannerRestrictionContext);
return CreateDistributedPlannedStmt(planContext);
}
@ -955,6 +967,42 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
}
/*
* ReplanAfterQueryModification replans modified originalquery to update plannercontext
* properly. Returns modified query.
*/
Query *
ReplanAfterQueryModification(Query *originalQuery, ParamListInfo boundParams)
{
Query *newQuery = copyObject(originalQuery);
bool setPartitionedTablesInherited = false;
PlannerRestrictionContext *currentPlannerRestrictionContext =
CurrentPlannerRestrictionContext();
/* reset the current planner restrictions context */
ResetPlannerRestrictionContext(currentPlannerRestrictionContext);
/*
* We force standard_planner to treat partitioned tables as regular tables
* by clearing the inh flag on RTEs. We already did this at the start of
* distributed_planner, but on a copy of the original query, so we need
* to do it again here.
*/
AdjustPartitioningForDistributedPlanning(ExtractRangeTableEntryList(newQuery),
setPartitionedTablesInherited);
/*
* Some relations may have been removed from the query, but we can skip
* AssignRTEIdentities since we currently do not rely on RTE identities
* being contiguous.
*/
standard_planner(newQuery, NULL, 0, boundParams);
return newQuery;
}
/*
* CreateDistributedPlan generates a distributed plan for a query.
* It goes through 3 steps:
@ -1119,30 +1167,7 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
"joined on their distribution columns")));
}
Query *newQuery = copyObject(originalQuery);
bool setPartitionedTablesInherited = false;
PlannerRestrictionContext *currentPlannerRestrictionContext =
CurrentPlannerRestrictionContext();
/* reset the current planner restrictions context */
ResetPlannerRestrictionContext(currentPlannerRestrictionContext);
/*
* We force standard_planner to treat partitioned tables as regular tables
* by clearing the inh flag on RTEs. We already did this at the start of
* distributed_planner, but on a copy of the original query, so we need
* to do it again here.
*/
AdjustPartitioningForDistributedPlanning(ExtractRangeTableEntryList(newQuery),
setPartitionedTablesInherited);
/*
* Some relations may have been removed from the query, but we can skip
* AssignRTEIdentities since we currently do not rely on RTE identities
* being contiguous.
*/
standard_planner(newQuery, NULL, 0, boundParams);
Query *newQuery = ReplanAfterQueryModification(originalQuery, boundParams);
/* overwrite the old transformed query with the new transformed query */
*query = *newQuery;
@ -1874,6 +1899,27 @@ multi_join_restriction_hook(PlannerInfo *root,
joinRestriction->innerrelRelids = bms_copy(innerrel->relids);
joinRestriction->outerrelRelids = bms_copy(outerrel->relids);
Relids joinrelids = bms_union(innerrel->relids, outerrel->relids);
List *prevVals = NIL;
EquivalenceClass *eqclass = NULL;
foreach_ptr(eqclass, root->eq_classes)
{
prevVals = lappend_int(prevVals, eqclass->ec_has_const);
eqclass->ec_has_const = false;
}
joinRestrictionContext->generatedEcJoinRestrictInfoList =
generate_join_implied_equalities(
root,
joinrelids,
outerrel->relids,
innerrel);
int i;
for (i = 0; i < list_length(prevVals); i++)
{
EquivalenceClass *eqClass = list_nth(root->eq_classes, i);
eqClass->ec_has_const = list_nth_int(prevVals, i);
}
joinRestrictionContext->joinRestrictionList =
lappend(joinRestrictionContext->joinRestrictionList, joinRestriction);

View File

@ -83,7 +83,9 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
Oid *
selectPartitionColumnTableId);
static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse,
ParamListInfo boundParams);
ParamListInfo boundParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery);
static Query * WrapSubquery(Query *subquery);
static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList);
@ -253,7 +255,8 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery,
* repartitioning.
*/
distributedPlan = CreateNonPushableInsertSelectPlan(planId, originalQuery,
boundParams);
boundParams,
plannerRestrictionContext);
}
return distributedPlan;
@ -360,6 +363,62 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
}
/*
* RelabelPlannerRestrictionContext relabels var nos inside restriction infos to 1.
*/
void
RelabelPlannerRestrictionContext(PlannerRestrictionContext *plannerRestrictionContext)
{
List *relationRestrictionList =
plannerRestrictionContext->relationRestrictionContext->relationRestrictionList;
if (plannerRestrictionContext->fastPathRestrictionContext &&
plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue)
{
Const *distKeyVal =
plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue;
Var *partitionColumn = PartitionColumn(
plannerRestrictionContext->fastPathRestrictionContext->distRelId, 1);
OpExpr *partitionExpression = MakeOpExpression(partitionColumn,
BTEqualStrategyNumber);
Node *rightOp = get_rightop((Expr *) partitionExpression);
Const *rightConst = (Const *) rightOp;
*rightConst = *distKeyVal;
RestrictInfo *fastpathRestrictInfo = makeNode(RestrictInfo);
fastpathRestrictInfo->can_join = false;
fastpathRestrictInfo->is_pushed_down = true;
fastpathRestrictInfo->clause = (Expr *) partitionExpression;
RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
relationRestriction->relOptInfo = palloc0(sizeof(RelOptInfo));
relationRestriction->relOptInfo->baserestrictinfo = list_make1(
fastpathRestrictInfo);
plannerRestrictionContext->relationRestrictionContext->relationRestrictionList =
list_make1(relationRestriction);
return;
}
RelationRestriction *relationRestriction = NULL;
foreach_ptr(relationRestriction, relationRestrictionList)
{
RelOptInfo *relOptInfo = relationRestriction->relOptInfo;
List *baseRestrictInfoList = relOptInfo->baserestrictinfo;
RestrictInfo *baseRestrictInfo = NULL;
foreach_ptr(baseRestrictInfo, baseRestrictInfoList)
{
List *varList = pull_var_clause_default((Node *) baseRestrictInfo->clause);
Var *var = NULL;
foreach_ptr(var, varList)
{
var->varno = 1;
var->varnosyn = 1;
}
}
}
}
/*
* CreateInsertSelectIntoLocalTablePlan creates the plan for INSERT .. SELECT queries
* where the selected table is distributed and the inserted table is not.
@ -384,6 +443,8 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
Query *selectQuery = selectRte->subquery;
RelabelPlannerRestrictionContext(plannerRestrictionContext);
bool allowRecursivePlanning = true;
DistributedPlan *distPlan = CreateDistributedPlan(planId, allowRecursivePlanning,
selectQuery,
@ -1476,7 +1537,8 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
* distributed table. The query plan can also be executed on a worker in MX.
*/
static DistributedPlan *
CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams)
CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
Query *insertSelectQuery = copyObject(parse);

View File

@ -25,19 +25,22 @@
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/shard_pruning.h"
#include "distributed/worker_protocol.h"
#include "lib/stringinfo.h"
#include "optimizer/optimizer.h"
#include "utils/builtins.h"
#include "nodes/nodeFuncs.h"
#include "nodes/nodes.h"
#include "nodes/pathnodes.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
/* Config variables managed via guc.c */
bool LogMultiJoinOrder = false; /* print join order as a debugging aid */
bool EnableSingleHashRepartitioning = false;
@ -55,7 +58,8 @@ static RuleEvalFunction RuleEvalFunctionArray[JOIN_RULE_LAST] = { 0 }; /* join r
/* Local functions forward declarations */
static bool JoinExprListWalker(Node *node, List **joinList);
static List * JoinOrderForTable(TableEntry *firstTable, List *tableEntryList,
List *joinClauseList);
List *joinRestrictInfoListList,
List *generatedEcJoinClauseList);
static List * BestJoinOrder(List *candidateJoinOrders);
static List * FewestOfJoinRuleType(List *candidateJoinOrders, JoinRuleType ruleType);
static uint32 JoinRuleTypeCount(List *joinOrder, JoinRuleType ruleTypeToCount);
@ -65,27 +69,39 @@ static uint32 LargeDataTransferLocation(List *joinOrder);
static List * TableEntryListDifference(List *lhsTableList, List *rhsTableList);
static bool ConvertSemiToInnerInJoinInfoContext(JoinInfoContext *joinOrderContext);
static bool JoinInfoContextHasAntiJoin(JoinInfoContext *joinOrderContext);
static List * FindJoinClauseForTables(List *joinRestrictInfoListList,
List *generatedEcJoinClauseList,
List *lhsTableIdList,
uint32 rhsTableId,
JoinType joinType);
static const char * JoinTypeName(JoinType jointype);
static List * ExtractPushdownJoinRestrictInfos(List *joinRestrictInfoList,
RestrictInfo *joinRestrictInfo, JoinType
joinType);
/* Local functions forward declarations for join evaluations */
static JoinOrderNode * EvaluateJoinRules(List *joinedTableList,
JoinOrderNode *currentJoinNode,
TableEntry *candidateTable,
List *joinClauseList,
JoinType joinType,
bool passJoinClauseDirectly);
List *joinRestrictInfoListList,
List *generatedEcJoinClauseList,
JoinType joinType);
static List * RangeTableIdList(List *tableList);
static TableEntry * TableEntryByRangeTableId(List *tableEntryList, uint32 rangeTableIdx);
static RuleEvalFunction JoinRuleEvalFunction(JoinRuleType ruleType);
static char * JoinRuleName(JoinRuleType ruleType);
static JoinOrderNode * ReferenceJoin(JoinOrderNode *joinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType);
static JoinOrderNode * ReferenceJoin(JoinOrderNode *joinNode,
TableEntry *candidateTable,
List *applicableJoinClauses,
JoinType joinType);
static JoinOrderNode * CartesianProductReferenceJoin(JoinOrderNode *joinNode,
TableEntry *candidateTable,
List *applicableJoinClauses,
JoinType joinType);
static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType);
static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode,
TableEntry *candidateTable,
List *applicableJoinClauses,
JoinType joinType);
static bool JoinOnColumns(List *currentPartitionColumnList, Var *candidatePartitionColumn,
List *joinClauseList);
static JoinOrderNode * SinglePartitionJoin(JoinOrderNode *joinNode,
@ -290,7 +306,8 @@ NodeIsEqualsOpExpr(Node *node)
* least amount of data across the network, and returns this join order.
*/
List *
JoinOrderList(List *tableEntryList, List *joinClauseList)
JoinOrderList(List *tableEntryList, List *joinRestrictInfoListList,
List *generatedEcJoinClauseList, List *pseudoClauseList)
{
List *candidateJoinOrderList = NIL;
ListCell *tableEntryCell = NULL;
@ -301,7 +318,8 @@ JoinOrderList(List *tableEntryList, List *joinClauseList)
/* each candidate join order starts with a different table */
List *candidateJoinOrder = JoinOrderForTable(startingTable, tableEntryList,
joinClauseList);
joinRestrictInfoListList,
generatedEcJoinClauseList);
if (candidateJoinOrder != NULL)
{
@ -349,12 +367,90 @@ TableEntryByRangeTableId(List *tableEntryList, uint32 rangeTableIdx)
}
/*
* ExtractPushdownJoinRestrictInfos extracts clauses, which are pushed down and treated like a normal filter,
* inside given join restriction infos.
*/
static List *
ExtractPushdownJoinRestrictInfos(List *restrictInfoListOfJoin,
RestrictInfo *joinRestrictInfo, JoinType joinType)
{
List *joinFilterRestrictInfoList = NIL;
/* left and right relids of the join restriction */
Bitmapset *joinRelids = bms_union(joinRestrictInfo->left_relids,
joinRestrictInfo->right_relids);
RestrictInfo *restrictInfo = NULL;
foreach_ptr(restrictInfo, restrictInfoListOfJoin)
{
if (!restrictInfo->can_join &&
(!IS_OUTER_JOIN(joinType) || RINFO_IS_PUSHED_DOWN(restrictInfo, joinRelids)))
{
joinFilterRestrictInfoList = lappend(joinFilterRestrictInfoList,
restrictInfo);
}
}
return joinFilterRestrictInfoList;
}
/*
* FindJoinClauseForTables finds join clause for given left hand side tables and
* right hand side table.
*/
static List *
FindJoinClauseForTables(List *joinRestrictInfoListList, List *generatedEcJoinClauseList,
List *lhsTableIdList, uint32 rhsTableId, JoinType joinType)
{
List *joinRestrictInfoList = NIL;
foreach_ptr(joinRestrictInfoList, joinRestrictInfoListList)
{
RestrictInfo *joinRestrictInfo = NULL;
foreach_ptr(joinRestrictInfo, joinRestrictInfoList)
{
Node *restrictClause = (Node *) joinRestrictInfo->clause;
if (joinRestrictInfo->can_join && IsApplicableJoinClause(lhsTableIdList,
rhsTableId,
restrictClause))
{
List *pushdownFakeRestrictInfoList = ExtractPushdownJoinRestrictInfos(
joinRestrictInfoList, joinRestrictInfo, joinType);
List *nonPushdownRestrictInfoList = list_difference(joinRestrictInfoList,
pushdownFakeRestrictInfoList);
List *nonPushdownRestrictClauseList =
ExtractRestrictClausesFromRestrictionInfoList(
nonPushdownRestrictInfoList);
return nonPushdownRestrictClauseList;
}
}
}
if (joinType == JOIN_INNER)
{
Node *ecClause = NULL;
foreach_ptr(ecClause, generatedEcJoinClauseList)
{
if (IsApplicableJoinClause(lhsTableIdList, rhsTableId, ecClause))
{
return list_make1(ecClause);
}
}
}
return NIL;
}
/*
* FixedJoinOrderList returns the best fixed join order according to
* applicable join rules for the nodes in the list.
*/
List *
FixedJoinOrderList(List *tableEntryList, JoinInfoContext *joinInfoContext)
FixedJoinOrderList(List *tableEntryList, JoinInfoContext *joinInfoContext,
List *joinRestrictInfoListList, List *generatedEcJoinClauseList,
List *pseudoClauseList)
{
/* we donot support anti joins as ruleutils files cannot deparse JOIN_ANTI */
if (JoinInfoContextHasAntiJoin(joinInfoContext))
@ -404,13 +500,12 @@ FixedJoinOrderList(List *tableEntryList, JoinInfoContext *joinInfoContext)
TableEntry *nextTable = TableEntryByRangeTableId(tableEntryList,
joinInfo->rightTableIdx);
bool passJoinClauseDirectly = true;
nextJoinNode = EvaluateJoinRules(joinedTableList,
currentJoinNode,
nextTable,
joinInfo->joinQualifierList,
joinInfo->joinType,
passJoinClauseDirectly);
joinRestrictInfoListList,
generatedEcJoinClauseList,
joinInfo->joinType);
if (nextJoinNode == NULL)
{
@ -490,7 +585,8 @@ ConvertSemiToInnerInJoinInfoContext(JoinInfoContext *joinOrderContext)
* returns this list.
*/
static List *
JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, List *joinClauseList)
JoinOrderForTable(TableEntry *firstTable, List *tableEntryList,
List *joinRestrictInfoListList, List *generatedEcJoinClauseList)
{
JoinRuleType firstJoinRule = JOIN_RULE_INVALID_FIRST;
int joinedTableCount = 1;
@ -533,13 +629,12 @@ JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, List *joinClause
JoinType joinType = JOIN_INNER;
/* evaluate all join rules for this pending table */
bool passJoinClauseDirectly = false;
JoinOrderNode *pendingJoinNode = EvaluateJoinRules(joinedTableList,
currentJoinNode,
pendingTable,
joinClauseList,
joinType,
passJoinClauseDirectly);
joinRestrictInfoListList,
generatedEcJoinClauseList,
joinType);
if (pendingJoinNode == NULL)
{
@ -872,40 +967,27 @@ TableEntryListDifference(List *lhsTableList, List *rhsTableList)
* next table, evaluates different join rules between the two tables, and finds
* the best join rule that applies. The function returns the applicable join
* order node which includes the join rule and the partition information.
*
* When we have only inner joins, we can commute the joins as we wish and it also
* does not matter if we merge or move join and where clauses. For query trees with
* only inner joins, `joinClauseList` contains join and where clauses combined so that
* we can push down some where clauses which are applicable as join clause, which is
* determined by `ApplicableJoinClauses`.
* When we have at least 1 outer join in a query tree, we cannot commute joins(that is
* why we have `FixedJoinOrderList`) or move join and where clauses as we wish because
* we would have incorrect results. We should pass join and where clauses separately while
* creating tasks. `joinClauseList` contains only join clauses when `passJoinClauseDirectly`
* is set true.
*/
static JoinOrderNode *
EvaluateJoinRules(List *joinedTableList, JoinOrderNode *currentJoinNode,
TableEntry *candidateTable, List *joinClauseList,
JoinType joinType, bool passJoinClauseDirectly)
TableEntry *candidateTable, List *joinRestrictInfoListList,
List *generatedEcJoinClauseList, JoinType joinType)
{
JoinOrderNode *nextJoinNode = NULL;
uint32 lowestValidIndex = JOIN_RULE_INVALID_FIRST + 1;
uint32 highestValidIndex = JOIN_RULE_LAST - 1;
List *joinClauses = joinClauseList;
if (!passJoinClauseDirectly)
{
/*
* We first find all applicable join clauses between already joined tables
* and the candidate table.
*/
List *joinedTableIdList = RangeTableIdList(joinedTableList);
uint32 candidateTableId = candidateTable->rangeTableId;
joinClauses = ApplicableJoinClauses(joinedTableIdList,
candidateTableId,
joinClauseList);
}
/*
* We first find all applicable join clauses between already joined tables
* and the candidate table.
*/
List *joinedTableIdList = RangeTableIdList(joinedTableList);
uint32 candidateTableId = candidateTable->rangeTableId;
List *applicableJoinClauseList = FindJoinClauseForTables(joinRestrictInfoListList,
generatedEcJoinClauseList,
joinedTableIdList,
candidateTableId,
joinType);
/* we then evaluate all join rules in order */
for (uint32 ruleIndex = lowestValidIndex; ruleIndex <= highestValidIndex; ruleIndex++)
@ -915,7 +997,7 @@ EvaluateJoinRules(List *joinedTableList, JoinOrderNode *currentJoinNode,
nextJoinNode = (*ruleEvalFunction)(currentJoinNode,
candidateTable,
joinClauses,
applicableJoinClauseList,
joinType);
/* break after finding the first join rule that applies */
@ -932,7 +1014,7 @@ EvaluateJoinRules(List *joinedTableList, JoinOrderNode *currentJoinNode,
Assert(nextJoinNode != NULL);
nextJoinNode->joinType = joinType;
nextJoinNode->joinClauseList = joinClauses;
nextJoinNode->joinClauseList = applicableJoinClauseList;
return nextJoinNode;
}
@ -1500,27 +1582,30 @@ IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, Node *joinCla
/*
* ApplicableJoinClauses finds all join clauses that apply between the given
* left table list and the right table, and returns these found join clauses.
* IsApplicableFalseConstantJoinClause returns true if it can find a constant false filter
* which is applied to right table and also at least one of the table in left tables.
*/
List *
ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId, List *joinClauseList)
bool
IsApplicableFalseConstantJoinClause(List *leftTableIdList, uint32 rightTableId,
RestrictInfo *restrictInfo)
{
List *applicableJoinClauses = NIL;
/* find whether restrictinfo relids contain right table relid */
Relids restrictionRelIds = restrictInfo->required_relids;
bool hasRightTable = bms_is_member(rightTableId, restrictionRelIds);
/* make sure joinClauseList contains only join clauses */
joinClauseList = JoinClauseList(joinClauseList);
Node *joinClause = NULL;
foreach_ptr(joinClause, joinClauseList)
/* convert left table id list to bitmapset */
Relids leftTableRelIds = NULL;
int leftTableId = -1;
foreach_int(leftTableId, leftTableIdList)
{
if (IsApplicableJoinClause(leftTableIdList, rightTableId, joinClause))
{
applicableJoinClauses = lappend(applicableJoinClauses, joinClause);
}
leftTableRelIds = bms_add_member(leftTableRelIds, leftTableId);
}
return applicableJoinClauses;
/* find whether restrictinfo relids contain any of the left table relids */
Relids intersectLeftRelids = bms_intersect(restrictionRelIds, leftTableRelIds);
bool hasOneOfLeftTables = bms_num_members(intersectLeftRelids) > 0;
return hasRightTable && hasOneOfLeftTables;
}

View File

@ -169,9 +169,8 @@ typedef struct OrderByLimitReference
/* Local functions forward declarations */
static MultiSelect * AndSelectNode(MultiSelect *selectNode);
static MultiSelect * OrSelectNode(MultiSelect *selectNode);
static List * OrSelectClauseList(List *selectClauseList);
static MultiSelect * PushdownableSelectNode(MultiSelect *selectNode);
static MultiSelect * NonPushdownableSelectNode(MultiSelect *selectNode);
static void PushDownNodeLoop(MultiUnaryNode *currentNode);
static void PullUpCollectLoop(MultiCollect *collectNode);
static void AddressProjectSpecialConditions(MultiProject *projectNode);
@ -381,27 +380,29 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
if (selectNodeList != NIL)
{
MultiSelect *selectNode = (MultiSelect *) linitial(selectNodeList);
MultiSelect *andSelectNode = AndSelectNode(selectNode);
MultiSelect *orSelectNode = OrSelectNode(selectNode);
MultiSelect *pushdownableSelectNode = PushdownableSelectNode(selectNode);
MultiSelect *nonPushdownableSelectNode = NonPushdownableSelectNode(selectNode);
if (andSelectNode != NULL && orSelectNode != NULL)
if (pushdownableSelectNode != NULL && nonPushdownableSelectNode != NULL)
{
MultiNode *parentNode = ParentNode((MultiNode *) selectNode);
MultiNode *childNode = ChildNode((MultiUnaryNode *) selectNode);
Assert(UnaryOperator(parentNode));
SetChild((MultiUnaryNode *) parentNode, (MultiNode *) orSelectNode);
SetChild((MultiUnaryNode *) orSelectNode, (MultiNode *) andSelectNode);
SetChild((MultiUnaryNode *) andSelectNode, (MultiNode *) childNode);
SetChild((MultiUnaryNode *) parentNode,
(MultiNode *) nonPushdownableSelectNode);
SetChild((MultiUnaryNode *) nonPushdownableSelectNode,
(MultiNode *) pushdownableSelectNode);
SetChild((MultiUnaryNode *) pushdownableSelectNode, (MultiNode *) childNode);
}
else if (andSelectNode != NULL && orSelectNode == NULL)
else if (pushdownableSelectNode != NULL && nonPushdownableSelectNode == NULL)
{
andSelectNode = selectNode; /* no need to modify the tree */
pushdownableSelectNode = selectNode; /* no need to modify the tree */
}
if (andSelectNode != NULL)
if (pushdownableSelectNode != NULL)
{
PushDownNodeLoop((MultiUnaryNode *) andSelectNode);
PushDownNodeLoop((MultiUnaryNode *) pushdownableSelectNode);
}
}
@ -486,71 +487,44 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
/*
* AndSelectNode looks for AND clauses in the given select node. If they exist,
* the function returns these clauses in a new node. Otherwise, the function
* PushdownableSelectNode looks for pushdownable clauses in the given select node.
* If they exist, the function returns these clauses in a new node. Otherwise, the function
* returns null.
*/
static MultiSelect *
AndSelectNode(MultiSelect *selectNode)
PushdownableSelectNode(MultiSelect *selectNode)
{
MultiSelect *andSelectNode = NULL;
List *selectClauseList = selectNode->selectClauseList;
List *orSelectClauseList = OrSelectClauseList(selectClauseList);
MultiSelect *pushdownableSelectNode = NULL;
/* AND clauses are select clauses that are not OR clauses */
List *andSelectClauseList = list_difference(selectClauseList, orSelectClauseList);
if (andSelectClauseList != NIL)
if (selectNode->pushdownableSelectClauseList != NIL)
{
andSelectNode = CitusMakeNode(MultiSelect);
andSelectNode->selectClauseList = andSelectClauseList;
pushdownableSelectNode = CitusMakeNode(MultiSelect);
pushdownableSelectNode->selectClauseList =
selectNode->pushdownableSelectClauseList;
}
return andSelectNode;
return pushdownableSelectNode;
}
/*
* OrSelectNode looks for OR clauses in the given select node. If they exist,
* the function returns these clauses in a new node. Otherwise, the function
* PushdownableSelectNode looks for nonpushdownable clauses in the given select node.
* If they exist, the function returns these clauses in a new node. Otherwise, the function
* returns null.
*/
static MultiSelect *
OrSelectNode(MultiSelect *selectNode)
NonPushdownableSelectNode(MultiSelect *selectNode)
{
MultiSelect *orSelectNode = NULL;
List *selectClauseList = selectNode->selectClauseList;
List *orSelectClauseList = OrSelectClauseList(selectClauseList);
MultiSelect *nonPushdownableSelectNode = NULL;
if (orSelectClauseList != NIL)
if (selectNode->nonPushdownableSelectClauseList != NIL)
{
orSelectNode = CitusMakeNode(MultiSelect);
orSelectNode->selectClauseList = orSelectClauseList;
nonPushdownableSelectNode = CitusMakeNode(MultiSelect);
nonPushdownableSelectNode->selectClauseList =
selectNode->nonPushdownableSelectClauseList;
}
return orSelectNode;
}
/*
* OrSelectClauseList walks over the select clause list, and returns all clauses
* that have OR expressions in them.
*/
static List *
OrSelectClauseList(List *selectClauseList)
{
List *orSelectClauseList = NIL;
Node *selectClause = NULL;
foreach_ptr(selectClause, selectClauseList)
{
bool orClause = is_orclause(selectClause);
if (orClause)
{
orSelectClauseList = lappend(orSelectClauseList, selectClause);
}
}
return orSelectClauseList;
return nonPushdownableSelectNode;
}

View File

@ -33,7 +33,6 @@
#include "distributed/relation_restriction_equivalence.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/query_utils.h"
#include "distributed/multi_router_planner.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
#include "nodes/makefuncs.h"
@ -41,6 +40,7 @@
#include "nodes/pathnodes.h"
#include "optimizer/optimizer.h"
#include "optimizer/clauses.h"
#include "optimizer/paths.h"
#include "optimizer/prep.h"
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
@ -85,20 +85,20 @@ static bool ExtractFromExpressionWalker(Node *node,
QualifierWalkerContext *walkerContext);
static List * MultiTableNodeList(List *tableEntryList, List *rangeTableList);
static List * AddMultiCollectNodes(List *tableNodeList);
static MultiNode * MultiJoinTree(List *joinOrderList, List *collectTableList,
bool passJoinClauseDirectly);
static MultiNode * MultiJoinTree(List *joinOrderList, List *collectTableList);
static MultiCollect * CollectNodeForTable(List *collectTableList, uint32 rangeTableId);
static MultiSelect * MultiSelectNode(List *whereClauseList, bool passWhereClauseDirectly);
static MultiSelect * MultiSelectNode(List *pushdownableClauseList,
List *nonPushdownableClauseList);
static bool IsSelectClause(Node *clause);
static JoinInfoContext * FetchJoinOrderContext(FromExpr *fromExpr);
static bool JoinInfoWalker(Node *node, JoinInfoContext *joinInfoContext);
static List * ExtractNonPushdownableJoinClauses(List *joinOrderList);
/* Local functions forward declarations for applying joins */
static MultiNode * ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode,
JoinRuleType ruleType, List *partitionColumnList,
JoinType joinType, List *joinClauseList,
bool passJoinClauseDirectly);
JoinType joinType, List *joinClauseList);
static RuleApplyFunction JoinRuleApplyFunction(JoinRuleType ruleType);
static MultiNode * ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
List *partitionColumnList, JoinType joinType,
@ -157,7 +157,7 @@ MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree,
}
else
{
multiQueryNode = MultiNodeTree(queryTree);
multiQueryNode = MultiNodeTree(queryTree, plannerRestrictionContext);
}
/* add a root node to serve as the permanent handle to the tree */
@ -566,18 +566,16 @@ SubqueryEntryList(Query *queryTree)
* group, and limit nodes if they appear in the original query tree.
*/
MultiNode *
MultiNodeTree(Query *queryTree)
MultiNodeTree(Query *queryTree, PlannerRestrictionContext *plannerRestrictionContext)
{
List *rangeTableList = queryTree->rtable;
List *targetEntryList = queryTree->targetList;
List *joinClauseList = NIL;
List *joinOrderList = NIL;
List *tableEntryList = NIL;
List *tableNodeList = NIL;
List *collectTableList = NIL;
MultiNode *joinTreeNode = NULL;
MultiNode *currentTopNode = NULL;
bool passQualClauseDirectly = false;
/* verify we can perform distributed planning on this query */
DeferredErrorMessage *unsupportedQueryError = DeferErrorIfQueryNotSupported(
@ -587,17 +585,35 @@ MultiNodeTree(Query *queryTree)
RaiseDeferredError(unsupportedQueryError, ERROR);
}
/* extract where and join clause qualifiers(including outer join quals) and verify we can plan for them. */
List *qualClauseList = QualifierList(queryTree->jointree);
/* extract join and nonjoin clauses from plannerRestrictionContext */
RestrictInfoContext *restrictInfoContext = ExtractRestrictionInfosFromPlannerContext(
plannerRestrictionContext);
List *joinRestrictInfoList = restrictInfoContext->joinRestrictInfoList;
List *nonJoinRestrictionInfoList = restrictInfoContext->baseRestrictInfoList;
List *joinRestrictInfoListList = restrictInfoContext->joinRestrictInfoListList;
List *pseudoRestrictInfoList = restrictInfoContext->pseudoRestrictInfoList;
List *generatedEcJoinRestrictInfoList =
plannerRestrictionContext->joinRestrictionContext->generatedEcJoinRestrictInfoList;
/* verify we can plan for restriction clauses */
List *baseClauseList = ExtractRestrictClausesFromRestrictionInfoList(
nonJoinRestrictionInfoList);
List *allJoinClauseList = ExtractRestrictClausesFromRestrictionInfoList(
joinRestrictInfoList);
List *pseudoClauseList = ExtractRestrictClausesFromRestrictionInfoList(
pseudoRestrictInfoList);
List *generatedEcJoinClauseList = ExtractRestrictClausesFromRestrictionInfoList(
generatedEcJoinRestrictInfoList);
allJoinClauseList = list_concat_unique(allJoinClauseList, generatedEcJoinClauseList);
List *qualClauseList = list_concat_copy(baseClauseList, allJoinClauseList);
unsupportedQueryError = DeferErrorIfUnsupportedClause(qualClauseList);
if (unsupportedQueryError)
{
RaiseDeferredErrorInternal(unsupportedQueryError, ERROR);
}
/* WhereClauseList() merges join qualifiers and base qualifiers into result list */
List *whereClauseList = WhereClauseList(queryTree->jointree);
/*
* If we have a subquery, build a multi table node for the subquery and
* add a collect node on top of the multi table node.
@ -634,7 +650,7 @@ MultiNodeTree(Query *queryTree)
*/
Assert(list_length(subqueryEntryList) == 1);
List *whereClauseColumnList = pull_var_clause_default((Node *) whereClauseList);
List *whereClauseColumnList = pull_var_clause_default((Node *) baseClauseList);
List *targetListColumnList = pull_var_clause_default((Node *) targetEntryList);
List *columnList = list_concat(whereClauseColumnList, targetListColumnList);
@ -645,7 +661,8 @@ MultiNodeTree(Query *queryTree)
}
/* recursively create child nested multitree */
MultiNode *subqueryExtendedNode = MultiNodeTree(subqueryTree);
MultiNode *subqueryExtendedNode = MultiNodeTree(subqueryTree,
plannerRestrictionContext);
SetChild((MultiUnaryNode *) subqueryCollectNode, (MultiNode *) subqueryNode);
SetChild((MultiUnaryNode *) subqueryNode, subqueryExtendedNode);
@ -686,35 +703,41 @@ MultiNodeTree(Query *queryTree)
/* extract join infos for left recursive join tree */
JoinInfoContext *joinInfoContext = FetchJoinOrderContext(queryTree->jointree);
/* where clause should only contain base qualifiers */
whereClauseList = joinInfoContext->baseQualifierList;
/* we simply donot commute joins as we have at least 1 outer join */
joinOrderList = FixedJoinOrderList(tableEntryList, joinInfoContext);
/* pass join clauses directly as they are while creating tasks */
passQualClauseDirectly = true;
joinOrderList = FixedJoinOrderList(tableEntryList, joinInfoContext,
joinRestrictInfoListList,
generatedEcJoinClauseList,
pseudoClauseList);
}
else
{
/* consider also base qualifications */
joinClauseList = JoinClauseList(whereClauseList);
/* find best join order for commutative inner joins */
joinOrderList = JoinOrderList(tableEntryList, joinClauseList);
joinOrderList = JoinOrderList(tableEntryList, joinRestrictInfoListList,
generatedEcJoinClauseList, pseudoClauseList);
}
/* build join tree using the join order and collected tables */
joinTreeNode = MultiJoinTree(joinOrderList, collectTableList,
passQualClauseDirectly);
joinTreeNode = MultiJoinTree(joinOrderList, collectTableList);
currentTopNode = joinTreeNode;
}
Assert(currentTopNode != NULL);
/* build select node if the query has selection criteria */
MultiSelect *selectNode = MultiSelectNode(whereClauseList, passQualClauseDirectly);
/*
* build select node if the query has selection criteria
* select node will have pushdownable and non-pushdownable parts.
* - all base clauses can be pushdownable
* - some of join clauses cannot be pushed down e.g. they can be only applied after join
*/
List *pushdownableSelectClauseList = baseClauseList;
List *nonpushdownableJoinClauseList = ExtractNonPushdownableJoinClauses(
joinOrderList);
List *nonPushdownableSelectClauseList = list_difference(allJoinClauseList,
nonpushdownableJoinClauseList);
MultiSelect *selectNode = MultiSelectNode(pushdownableSelectClauseList,
nonPushdownableSelectClauseList);
if (selectNode != NULL)
{
SetChild((MultiUnaryNode *) selectNode, currentTopNode);
@ -740,6 +763,128 @@ MultiNodeTree(Query *queryTree)
}
/*
* ExtractNonPushdownableJoinClauses returns pushdownable clauses from given join
* restrict infos.
*/
static List *
ExtractNonPushdownableJoinClauses(List *joinOrderList)
{
List *nonPushdownJoinClauseList = NIL;
JoinOrderNode *joinOrderNode = NULL;
foreach_ptr(joinOrderNode, joinOrderList)
{
List *joinClauselist = joinOrderNode->joinClauseList;
nonPushdownJoinClauseList = list_concat(nonPushdownJoinClauseList,
joinClauselist);
}
return nonPushdownJoinClauseList;
}
/*
* RestrictInfoContext extracts all RestrictionInfo from PlannerRestrictionContext.
*/
RestrictInfoContext *
ExtractRestrictionInfosFromPlannerContext(
PlannerRestrictionContext *plannerRestrictionContext)
{
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
JoinRestrictionContext *joinRestrictionContext =
plannerRestrictionContext->joinRestrictionContext;
List *baseRestrictInfoList = NIL;
List *joinRestrictInfoList = NIL;
List *joinRestrictInfoListList = NIL;
List *pseudoRestrictInfoList = NIL;
/* collect all restrictInfos from relationRestrictionContext */
RelationRestriction *relationRestriction = NULL;
foreach_ptr(relationRestriction, relationRestrictionContext->relationRestrictionList)
{
RelOptInfo *relOptInfo = relationRestriction->relOptInfo;
RestrictInfo *baseRestrictInfo = NULL;
foreach_ptr(baseRestrictInfo, relOptInfo->baserestrictinfo)
{
if (baseRestrictInfo->pseudoconstant)
{
pseudoRestrictInfoList = list_append_unique(pseudoRestrictInfoList,
baseRestrictInfo);
continue;
}
baseRestrictInfoList = list_append_unique(baseRestrictInfoList,
baseRestrictInfo);
}
RestrictInfo *joinRestrictInfo = NULL;
foreach_ptr(joinRestrictInfo, relOptInfo->joininfo)
{
if (joinRestrictInfo->pseudoconstant)
{
pseudoRestrictInfoList = list_append_unique(pseudoRestrictInfoList,
joinRestrictInfo);
}
}
}
/* collect all restrictInfos from joinRestrictionContext */
JoinRestriction *joinRestriction = NULL;
foreach_ptr(joinRestriction, joinRestrictionContext->joinRestrictionList)
{
List *currentJoinRestrictInfoList = NIL;
RestrictInfo *joinRestrictInfo = NULL;
foreach_ptr(joinRestrictInfo, joinRestriction->joinRestrictInfoList)
{
if (joinRestrictInfo->pseudoconstant)
{
pseudoRestrictInfoList = list_append_unique(pseudoRestrictInfoList,
joinRestrictInfo);
continue;
}
currentJoinRestrictInfoList = list_append_unique(currentJoinRestrictInfoList,
joinRestrictInfo);
joinRestrictInfoList = list_append_unique(joinRestrictInfoList,
joinRestrictInfo);
}
joinRestrictInfoListList = lappend(joinRestrictInfoListList,
currentJoinRestrictInfoList);
}
RestrictInfoContext *restrictInfoContext = palloc0(sizeof(RestrictInfoContext));
restrictInfoContext->baseRestrictInfoList = baseRestrictInfoList;
restrictInfoContext->joinRestrictInfoList = joinRestrictInfoList;
restrictInfoContext->joinRestrictInfoListList = joinRestrictInfoListList;
restrictInfoContext->pseudoRestrictInfoList = pseudoRestrictInfoList;
return restrictInfoContext;
}
/*
* ExtractRestrictClausesFromRestrictionInfoList extracts RestrictInfo clauses from
* given restrictInfoList.
*/
List *
ExtractRestrictClausesFromRestrictionInfoList(List *restrictInfoList)
{
List *restrictClauseList = NIL;
RestrictInfo *restrictInfo = NULL;
foreach_ptr(restrictInfo, restrictInfoList)
{
restrictClauseList = lappend(restrictClauseList, restrictInfo->clause);
}
return restrictClauseList;
}
/*
* FetchJoinOrderContext returns all join info for given node.
*/
@ -1357,28 +1502,6 @@ WhereClauseList(FromExpr *fromExpr)
}
/*
* QualifierList walks over the FROM expression in the query tree, and builds
* a list of all qualifiers from the expression tree. The function checks for
* both implicitly and explicitly defined qualifiers. Note that this function
* is very similar to WhereClauseList(), but QualifierList() also includes
* outer-join clauses.
*/
List *
QualifierList(FromExpr *fromExpr)
{
FromExpr *fromExprCopy = copyObject(fromExpr);
QualifierWalkerContext *walkerContext = palloc0(sizeof(QualifierWalkerContext));
List *qualifierList = NIL;
ExtractFromExpressionWalker((Node *) fromExprCopy, walkerContext);
qualifierList = list_concat(qualifierList, walkerContext->baseQualifierList);
qualifierList = list_concat(qualifierList, walkerContext->outerJoinQualifierList);
return qualifierList;
}
/*
* DeferErrorIfUnsupportedClause walks over the given list of clauses, and
* checks that we can recognize all the clauses. This function ensures that
@ -1405,31 +1528,6 @@ DeferErrorIfUnsupportedClause(List *clauseList)
}
/*
* JoinClauseList finds the join clauses from the given where clause expression
* list, and returns them. The function does not iterate into nested OR clauses
* and relies on find_duplicate_ors() in the optimizer to pull up factorizable
* OR clauses.
*/
List *
JoinClauseList(List *whereClauseList)
{
List *joinClauseList = NIL;
ListCell *whereClauseCell = NULL;
foreach(whereClauseCell, whereClauseList)
{
Node *whereClause = (Node *) lfirst(whereClauseCell);
if (IsJoinClause(whereClause))
{
joinClauseList = lappend(joinClauseList, whereClause);
}
}
return joinClauseList;
}
/*
* ExtractFromExpressionWalker walks over a FROM expression, and finds all
* implicit and explicit qualifiers in the expression. The function looks at
@ -1712,7 +1810,7 @@ AddMultiCollectNodes(List *tableNodeList)
* this tree after every table in the list has been joined.
*/
static MultiNode *
MultiJoinTree(List *joinOrderList, List *collectTableList, bool passJoinClauseDirectly)
MultiJoinTree(List *joinOrderList, List *collectTableList)
{
MultiNode *currentTopNode = NULL;
ListCell *joinOrderCell = NULL;
@ -1744,8 +1842,7 @@ MultiJoinTree(List *joinOrderList, List *collectTableList, bool passJoinClauseDi
(MultiNode *) collectNode,
joinRuleType, partitionColumnList,
joinType,
joinClauseList,
passJoinClauseDirectly);
joinClauseList);
/* the new join node becomes the top of our join tree */
currentTopNode = newJoinNode;
@ -1792,33 +1889,20 @@ CollectNodeForTable(List *collectTableList, uint32 rangeTableId)
* MultiSelectNode extracts the select clauses from the given where clause list,
* and builds a MultiSelect node from these clauses. If the expression tree does
* not have any select clauses, the function return null.
*
* When we have at least 1 outer join in a query tree, we cannot commute joins(that is
* why we have `FixedJoinOrderList`) or move join and where clauses as we wish because
* we would have incorrect results. We should pass join and where clauses separately as
* they are while creating tasks. `whereClauseList` should be passed as it is when
* `passWhereClauseDirectly` is set true.
*/
static MultiSelect *
MultiSelectNode(List *whereClauseList, bool passWhereClauseDirectly)
MultiSelectNode(List *pushdownableClauseList, List *nonPushdownableClauseList)
{
List *selectClauseList = NIL;
MultiSelect *selectNode = NULL;
ListCell *whereClauseCell = NULL;
foreach(whereClauseCell, whereClauseList)
{
Node *whereClause = (Node *) lfirst(whereClauseCell);
if (passWhereClauseDirectly || IsSelectClause(whereClause))
{
selectClauseList = lappend(selectClauseList, whereClause);
}
}
if (list_length(selectClauseList) > 0)
if (list_length(pushdownableClauseList) > 0 ||
list_length(nonPushdownableClauseList) > 0)
{
selectNode = CitusMakeNode(MultiSelect);
selectNode->selectClauseList = selectClauseList;
selectNode->selectClauseList = list_concat_copy(pushdownableClauseList,
nonPushdownableClauseList);
selectNode->pushdownableSelectClauseList = pushdownableClauseList;
selectNode->nonPushdownableSelectClauseList = nonPushdownableClauseList;
}
return selectNode;
@ -2100,37 +2184,12 @@ pull_var_clause_default(Node *node)
*/
static MultiNode *
ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, JoinRuleType ruleType,
List *partitionColumnList, JoinType joinType, List *joinClauseList,
bool passJoinClauseDirectly)
List *partitionColumnList, JoinType joinType, List *joinClauseList)
{
List *leftTableIdList = OutputTableIdList(leftNode);
List *rightTableIdList = OutputTableIdList(rightNode);
int rightTableIdCount PG_USED_FOR_ASSERTS_ONLY = 0;
rightTableIdCount = list_length(rightTableIdList);
Assert(rightTableIdCount == 1);
List *joinClauses = joinClauseList;
if (!passJoinClauseDirectly)
{
/* find applicable join clauses between the left and right data sources */
uint32 rightTableId = (uint32) linitial_int(rightTableIdList);
joinClauses = ApplicableJoinClauses(leftTableIdList, rightTableId,
joinClauseList);
}
/* call the join rule application function to create the new join node */
RuleApplyFunction ruleApplyFunction = JoinRuleApplyFunction(ruleType);
MultiNode *multiNode = (*ruleApplyFunction)(leftNode, rightNode, partitionColumnList,
joinType, joinClauses);
if (joinType != JOIN_INNER && CitusIsA(multiNode, MultiJoin))
{
MultiJoin *joinNode = (MultiJoin *) multiNode;
/* preserve non-join clauses for OUTER joins */
joinNode->joinClauseList = list_copy(joinClauseList);
}
joinType, joinClauseList);
return multiNode;
}

View File

@ -224,8 +224,18 @@ ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
* Some unsupported join clauses in logical planner
* may be supported by subquery pushdown planner.
*/
List *qualifierList = QualifierList(rewrittenQuery->jointree);
if (DeferErrorIfUnsupportedClause(qualifierList) != NULL)
RestrictInfoContext *restrictInfoContext = ExtractRestrictionInfosFromPlannerContext(
plannerRestrictionContext);
List *joinRestrictionInfoList = restrictInfoContext->joinRestrictInfoList;
List *nonJoinRestrictionInfoList = restrictInfoContext->baseRestrictInfoList;
/* verify we can plan for restriction clauses */
List *whereClauseList = ExtractRestrictClausesFromRestrictionInfoList(
nonJoinRestrictionInfoList);
List *joinClauseList = ExtractRestrictClausesFromRestrictionInfoList(
joinRestrictionInfoList);
List *qualClauseList = list_concat_copy(whereClauseList, joinClauseList);
if (DeferErrorIfUnsupportedClause(qualClauseList) != NULL)
{
return true;
}

View File

@ -78,6 +78,7 @@ typedef struct JoinRestrictionContext
List *joinRestrictionList;
bool hasSemiJoin;
bool hasOuterJoin;
List *generatedEcJoinRestrictInfoList;
} JoinRestrictionContext;
typedef struct JoinRestriction
@ -105,6 +106,8 @@ typedef struct FastPathRestrictionContext
* Set to true when distKey = Param; in the queryTree
*/
bool distributionKeyHasParam;
int distRelId;
}FastPathRestrictionContext;
typedef struct PlannerRestrictionContext
@ -241,6 +244,8 @@ extern bool GetOriginalInh(RangeTblEntry *rte);
extern LOCKMODE GetQueryLockMode(Query *query);
extern int32 BlessRecordExpression(Expr *expr);
extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan);
extern Query * ReplanAfterQueryModification(Query *originalQuery, ParamListInfo
boundParams);
extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
struct DistributedPlan *distributedPlan);
extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query);

View File

@ -44,6 +44,8 @@ extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId,
plannerRestrictionContext);
extern char * InsertSelectResultIdPrefix(uint64 planId);
extern bool PlanningInsertSelect(void);
extern void RelabelPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext);
#endif /* INSERT_SELECT_PLANNER_H */

View File

@ -17,6 +17,7 @@
#include "postgres.h"
#include "nodes/pathnodes.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
@ -30,6 +31,7 @@
typedef enum JoinRuleType
{
JOIN_RULE_INVALID_FIRST = 0,
REFERENCE_JOIN = 1,
LOCAL_PARTITION_JOIN = 2,
SINGLE_HASH_PARTITION_JOIN = 3,
@ -114,13 +116,18 @@ extern bool EnableSingleHashRepartitioning;
/* Function declaration for determining table join orders */
extern List * JoinExprList(FromExpr *fromExpr);
extern List * JoinOrderList(List *rangeTableEntryList, List *joinClauseList);
extern List * JoinOrderList(List *rangeTableEntryList, List *joinRestrictInfoListList,
List *generatedEcJoinClauseList, List *pseudoClauseList);
extern List * FixedJoinOrderList(List *rangeTableEntryList,
JoinInfoContext *joinInfoContext);
JoinInfoContext *joinInfoContext,
List *joinRestrictInfoListList,
List *generatedEcJoinClauseList,
List *pseudoClauseList);
extern bool IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId,
Node *joinClause);
extern List * ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId,
List *joinClauseList);
extern bool IsApplicableFalseConstantJoinClause(List *leftTableIdList, uint32
rightTableId,
RestrictInfo *restrictInfo);
extern bool NodeIsEqualsOpExpr(Node *node);
extern bool IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable,
bool rightIsReferenceTable);

View File

@ -124,6 +124,8 @@ typedef struct MultiSelect
{
MultiUnaryNode unaryNode;
List *selectClauseList;
List *pushdownableSelectClauseList;
List *nonPushdownableSelectClauseList;
} MultiSelect;
@ -188,6 +190,17 @@ typedef struct MultiExtendedOp
} MultiExtendedOp;
/* RestrictInfoContext stores join and base restriction infos extracted from planner context*/
typedef struct RestrictInfoContext
{
List *baseRestrictInfoList;
List *joinRestrictInfoList;
List *joinRestrictInfoListList;
List *generatedEcJoinClauseList;
List *pseudoRestrictInfoList;
} RestrictInfoContext;
/* Function declarations for building logical plans */
extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree,
PlannerRestrictionContext *
@ -214,12 +227,13 @@ extern bool UnaryOperator(MultiNode *node);
extern bool BinaryOperator(MultiNode *node);
extern List * OutputTableIdList(MultiNode *multiNode);
extern List * FindNodesOfType(MultiNode *node, int type);
extern List * JoinClauseList(List *whereClauseList);
extern bool IsJoinClause(Node *clause);
extern List * SubqueryEntryList(Query *queryTree);
extern DeferredErrorMessage * DeferErrorIfQueryNotSupported(Query *queryTree);
extern List * WhereClauseList(FromExpr *fromExpr);
extern List * QualifierList(FromExpr *fromExpr);
extern RestrictInfoContext * ExtractRestrictionInfosFromPlannerContext(
PlannerRestrictionContext *plannerRestrictionContext);
extern List * ExtractRestrictClausesFromRestrictionInfoList(List *restrictInfoList);
extern List * TableEntryList(List *rangeTableList);
extern List * UsedTableEntryList(Query *query);
extern List * pull_var_clause_default(Node *node);
@ -229,7 +243,8 @@ extern MultiProject * MultiProjectNode(List *targetEntryList);
extern MultiExtendedOp * MultiExtendedOpNode(Query *queryTree, Query *originalQuery);
extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryRepartition(Query *
subqueryTree);
extern MultiNode * MultiNodeTree(Query *queryTree);
extern MultiNode * MultiNodeTree(Query *queryTree,
PlannerRestrictionContext *plannerRestrictionContext);
#endif /* MULTI_LOGICAL_PLANNER_H */

View File

@ -47,6 +47,7 @@ order by s_i_id;
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Seq Scan on stock_1640000 stock
Filter: (s_order_cnt IS NOT NULL)
Task Count: 4
Tasks Shown: One of 4
-> Task
@ -57,7 +58,7 @@ order by s_i_id;
-> Function Scan on read_intermediate_result intermediate_result
-> Seq Scan on stock_1640000 stock
Filter: ((s_order_cnt)::numeric > $0)
(36 rows)
(37 rows)
explain (costs false, summary false, timing false)
select s_i_id, sum(s_order_cnt) as ordercount
@ -84,6 +85,7 @@ order by s_i_id;
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Seq Scan on stock_1640000 stock
Filter: (s_order_cnt IS NOT NULL)
Task Count: 4
Tasks Shown: One of 4
-> Task
@ -91,7 +93,7 @@ order by s_i_id;
-> HashAggregate
Group Key: stock.s_i_id
-> Seq Scan on stock_1640000 stock
(24 rows)
(25 rows)
explain (costs false, summary false, timing false)
select s_i_id, sum(s_order_cnt) as ordercount
@ -115,6 +117,7 @@ having sum(s_order_cnt) > (select max(s_order_cnt) - 3 as having_query from st
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Seq Scan on stock_1640000 stock
Filter: (s_order_cnt IS NOT NULL)
Task Count: 4
Tasks Shown: One of 4
-> Task
@ -122,7 +125,7 @@ having sum(s_order_cnt) > (select max(s_order_cnt) - 3 as having_query from st
-> HashAggregate
Group Key: stock.s_i_id
-> Seq Scan on stock_1640000 stock
(22 rows)
(23 rows)
explain (costs false)
select s_i_id, sum(s_order_cnt) as ordercount

View File

@ -345,7 +345,7 @@ GROUP BY
ORDER BY
revenue DESC,
o_entry_d;
LOG: join order: [ "customer" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "new_order" ][ local partition join(INNER) "order_line" ]
LOG: join order: [ "customer" ][ local partition join(INNER) "new_order" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "order_line" ]
ol_o_id | ol_w_id | ol_d_id | revenue | o_entry_d
---------------------------------------------------------------------
10 | 10 | 10 | 10.00 | Fri Oct 17 00:00:00 2008
@ -472,7 +472,7 @@ ORDER BY
su_nationkey,
cust_nation,
l_year;
LOG: join order: [ "order_line" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ reference join(INNER) "nation" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "stock" ]
LOG: join order: [ "order_line" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ dual partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ]
supp_nation | cust_nation | l_year | revenue
---------------------------------------------------------------------
9 | C | 2008 | 3.00
@ -963,7 +963,7 @@ ORDER BY
su_nationkey,
cust_nation,
l_year;
LOG: join order: [ "order_line" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ reference join(INNER) "nation" ][ reference join(INNER) "supplier" ][ single hash partition join(INNER) "stock" ]
LOG: join order: [ "order_line" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ single hash partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ]
supp_nation | cust_nation | l_year | revenue
---------------------------------------------------------------------
9 | C | 2008 | 3.00
@ -1003,7 +1003,7 @@ WHERE i_id = s_i_id
AND i_id = ol_i_id
GROUP BY extract(YEAR FROM o_entry_d)
ORDER BY l_year;
LOG: join order: [ "order_line" ][ reference join(INNER) "item" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ reference join(INNER) "region" ][ single hash partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ]
LOG: join order: [ "order_line" ][ reference join(INNER) "item" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ reference join(INNER) "region" ][ dual partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ]
l_year | mkt_share
---------------------------------------------------------------------
2008 | 0.50000000000000000000
@ -1036,7 +1036,7 @@ GROUP BY
ORDER BY
n_name,
l_year DESC;
LOG: join order: [ "order_line" ][ reference join(INNER) "item" ][ local partition join(INNER) "oorder" ][ single hash partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ]
LOG: join order: [ "order_line" ][ reference join(INNER) "item" ][ local partition join(INNER) "oorder" ][ dual partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ]
n_name | l_year | sum_profit
---------------------------------------------------------------------
Germany | 2008 | 3.00

View File

@ -189,12 +189,14 @@ SELECT count(*) FROM users_table u1 CROSS JOIN users_ref_test_table ref2 JOIN us
reset citus.enable_repartition_joins;
-- although the following has the "ref LEFT JOIN dist" type of query, the LEFT JOIN is eliminated by Postgres
-- because the INNER JOIN eliminates the LEFT JOIN
SET citus.enable_repartition_joins TO on;
SELECT count(*) FROM users_ref_test_table ref1 CROSS JOIN users_ref_test_table ref2 LEFT JOIN users_table ON (ref1.id = users_table.user_id) JOIN users_table u2 ON (u2.user_id = users_table.user_id);
count
---------------------------------------------------------------------
11802
(1 row)
reset citus.enable_repartition_joins;
-- this is the same query as the above, but this time the outer query is also LEFT JOIN, meaning that Postgres
-- cannot eliminate the outer join
SELECT count(*) FROM users_ref_test_table ref1 CROSS JOIN users_ref_test_table ref2 LEFT JOIN users_table ON (ref1.id = users_table.user_id) LEFT JOIN users_table u2 ON (u2.user_id = users_table.user_id);

View File

@ -102,7 +102,9 @@ LOG: join order: [ "lineitem" ][ local partition join(INNER) "orders" ]
EXPLAIN (COSTS OFF)
SELECT l_quantity FROM lineitem, orders
WHERE (l_orderkey = o_orderkey OR l_quantity > 5);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
LOG: join order: [ "lineitem" ][ cartesian product(INNER) "orders" ]
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
EXPLAIN (COSTS OFF)
SELECT count(*) FROM orders, lineitem_hash
WHERE o_orderkey = l_orderkey;

View File

@ -41,7 +41,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ]
LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ cartesian product reference join(INNER) "supplier" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------
@ -69,7 +69,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ]
LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ cartesian product reference join(INNER) "supplier" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------
@ -124,7 +124,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ]
LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ cartesian product reference join(INNER) "supplier" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------
@ -152,7 +152,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ]
LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ cartesian product reference join(INNER) "supplier" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------
@ -180,7 +180,7 @@ GROUP BY
ORDER BY
l_partkey, l_suppkey
LIMIT 10;
LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ]
LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ cartesian product reference join(INNER) "supplier" ]
DEBUG: push down of limit count: 10
l_partkey | l_suppkey | count
---------------------------------------------------------------------

View File

@ -1074,8 +1074,96 @@ DETAIL: Cartesian products are currently unsupported
-- join order planner cannot handle right recursive joins
SELECT t1.*, t2.* FROM test_hash1 t1 LEFT JOIN ( test_hash2 t2 JOIN test_hash3 t3 ON t2.col2 = t3.col1) ON (t1.col1 = t2.col1) ORDER BY 1,2,3,4;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
-- sometimes join filters are pushed down and applied before join by PG
CREATE TABLE dist1 (x INT, y INT);
CREATE TABLE dist2 (x INT, y INT);
SELECT create_distributed_table('dist1','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist2','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist1 VALUES (1,2);
INSERT INTO dist1 VALUES (3,4);
INSERT INTO dist2 VALUES (1,2);
INSERT INTO dist2 VALUES (5,6);
-- single join condition
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) ORDER BY 1,2,3,4;
x | y | x | y
---------------------------------------------------------------------
1 | 2 | 1 | 2
3 | 4 | |
(2 rows)
-- single join condition and dist2.x >2 will be pushed down as it is on inner part of the join. e.g. filter out dist2.x <= 2 beforehand
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x AND dist2.x >2) ORDER BY 1,2,3,4;
x | y | x | y
---------------------------------------------------------------------
1 | 2 | |
3 | 4 | |
(2 rows)
-- single join condition and dist2.x >2 is regular filter and applied after join
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) WHERE dist2.x >2 ORDER BY 1,2,3,4;
LOG: join order: [ "dist1" ][ local partition join(INNER) "dist2" ]
x | y | x | y
---------------------------------------------------------------------
(0 rows)
-- single join condition and dist1.x >2 will not be pushed down as it is on outer part of the join
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x AND dist1.x >2) ORDER BY 1,2,3,4;
x | y | x | y
---------------------------------------------------------------------
1 | 2 | |
3 | 4 | |
(2 rows)
-- single join condition and dist1.x >2 is regular filter and applied after join
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) WHERE dist1.x >2 ORDER BY 1,2,3,4;
x | y | x | y
---------------------------------------------------------------------
3 | 4 | |
(1 row)
-- constant false filter as join filter for left join.
-- Inner table will be converted to empty result. Constant filter will be applied before join but will not be pushdowned.
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.y = dist2.y AND false) ORDER BY 1,2,3,4;
LOG: join order: [ "dist1" ][ cartesian product(LEFT) "dist2" ]
x | y | x | y
---------------------------------------------------------------------
1 | 2 | |
3 | 4 | |
(2 rows)
-- constant false filter as base filter for left join.
-- Both tables will be converted to empty result .e.g RTE_RESULT
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.y = dist2.y) WHERE false ORDER BY 1,2,3,4;
x | y | x | y
---------------------------------------------------------------------
(0 rows)
-- constant false filter as join filter for inner join.
-- Both tables will be converted to empty result .e.g RTE_RESULT
SELECT * FROM dist1 INNER JOIN dist2 ON (dist1.y = dist2.y AND false) ORDER BY 1,2,3,4;
x | y | x | y
---------------------------------------------------------------------
(0 rows)
-- constant false filter as base filter for inner join.
-- Both tables will be converted to empty result .e.g RTE_RESULT
SELECT * FROM dist1 INNER JOIN dist2 ON (dist1.y = dist2.y) WHERE false ORDER BY 1,2,3,4;
x | y | x | y
---------------------------------------------------------------------
(0 rows)
DROP SCHEMA non_colocated_outer_joins CASCADE;
NOTICE: drop cascades to 9 other objects
NOTICE: drop cascades to 11 other objects
DETAIL: drop cascades to table test_hash1
drop cascades to table test_hash2
drop cascades to table test_hash3
@ -1085,6 +1173,8 @@ drop cascades to table test_append3
drop cascades to table test_range1
drop cascades to table test_range2
drop cascades to table test_range3
drop cascades to table dist1
drop cascades to table dist2
RESET client_min_messages;
RESET citus.log_multi_join_order;
RESET citus.enable_repartition_joins;

View File

@ -98,7 +98,7 @@ FROM
WHERE
r1.id = t1.id AND t2.sum = t1.id;
DEBUG: Router planner cannot handle multi-shard select queries
LOG: join order: [ "single_hash_repartition_second" ][ reference join(INNER) "ref_table" ][ single hash partition join(INNER) "single_hash_repartition_first" ]
LOG: join order: [ "single_hash_repartition_first" ][ reference join(INNER) "ref_table" ][ single hash partition join(INNER) "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
@ -326,7 +326,19 @@ FROM
WHERE
t1.id = t2.sum AND t2.sum = t3.id;
DEBUG: Router planner cannot handle multi-shard select queries
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join(INNER) "single_hash_repartition_second" ][ single hash partition join(INNER) "single_hash_repartition_second" ]
LOG: join order: [ "single_hash_repartition_first" ][ local partition join(INNER) "single_hash_repartition_second" ][ single hash partition join(INNER) "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
@ -347,26 +359,6 @@ DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 14
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 24
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- two single repartitions again, but this

View File

@ -65,7 +65,9 @@ reset citus.enable_repartition_joins;
-- although the following has the "ref LEFT JOIN dist" type of query, the LEFT JOIN is eliminated by Postgres
-- because the INNER JOIN eliminates the LEFT JOIN
SET citus.enable_repartition_joins TO on;
SELECT count(*) FROM users_ref_test_table ref1 CROSS JOIN users_ref_test_table ref2 LEFT JOIN users_table ON (ref1.id = users_table.user_id) JOIN users_table u2 ON (u2.user_id = users_table.user_id);
reset citus.enable_repartition_joins;
-- this is the same query as the above, but this time the outer query is also LEFT JOIN, meaning that Postgres
-- cannot eliminate the outer join

View File

@ -205,6 +205,43 @@ SELECT tt1.*, t3.* FROM (SELECT t1.* FROM test_hash1 t1, test_hash2 t2) tt1 LEFT
SELECT t1.*, t2.* FROM test_hash1 t1 LEFT JOIN ( test_hash2 t2 JOIN test_hash3 t3 ON t2.col2 = t3.col1) ON (t1.col1 = t2.col1) ORDER BY 1,2,3,4;
-- sometimes join filters are pushed down and applied before join by PG
CREATE TABLE dist1 (x INT, y INT);
CREATE TABLE dist2 (x INT, y INT);
SELECT create_distributed_table('dist1','x');
SELECT create_distributed_table('dist2','x');
INSERT INTO dist1 VALUES (1,2);
INSERT INTO dist1 VALUES (3,4);
INSERT INTO dist2 VALUES (1,2);
INSERT INTO dist2 VALUES (5,6);
-- single join condition
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) ORDER BY 1,2,3,4;
-- single join condition and dist2.x >2 will be pushed down as it is on inner part of the join. e.g. filter out dist2.x <= 2 beforehand
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x AND dist2.x >2) ORDER BY 1,2,3,4;
-- single join condition and dist2.x >2 is regular filter and applied after join
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) WHERE dist2.x >2 ORDER BY 1,2,3,4;
-- single join condition and dist1.x >2 will not be pushed down as it is on outer part of the join
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x AND dist1.x >2) ORDER BY 1,2,3,4;
-- single join condition and dist1.x >2 is regular filter and applied after join
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) WHERE dist1.x >2 ORDER BY 1,2,3,4;
-- constant false filter as join filter for left join.
-- Inner table will be converted to empty result. Constant filter will be applied before join but will not be pushdowned.
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.y = dist2.y AND false) ORDER BY 1,2,3,4;
-- constant false filter as base filter for left join.
-- Both tables will be converted to empty result .e.g RTE_RESULT
SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.y = dist2.y) WHERE false ORDER BY 1,2,3,4;
-- constant false filter as join filter for inner join.
-- Both tables will be converted to empty result .e.g RTE_RESULT
SELECT * FROM dist1 INNER JOIN dist2 ON (dist1.y = dist2.y AND false) ORDER BY 1,2,3,4;
-- constant false filter as base filter for inner join.
-- Both tables will be converted to empty result .e.g RTE_RESULT
SELECT * FROM dist1 INNER JOIN dist2 ON (dist1.y = dist2.y) WHERE false ORDER BY 1,2,3,4;
DROP SCHEMA non_colocated_outer_joins CASCADE;
RESET client_min_messages;
RESET citus.log_multi_join_order;