outer-join-noncolocated-dist-tables
aykutbozkurt 2023-01-19 23:04:33 +03:00
parent 222f11e23e
commit b9d8841f43
8 changed files with 108 additions and 82 deletions

View File

@ -127,14 +127,17 @@ static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Node *distributionKeyValue, int
fastPathRelId);
Node *distributionKeyValue,
int fastPathRelId);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter);
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
static void ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList);
static List * GenerateImplicitJoinRestrictInfoList(PlannerInfo *plannerInfo,
RelOptInfo *innerrel,
RelOptInfo *outerrel);
/* Distributed planner hook */
PlannedStmt *
@ -720,6 +723,7 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
planContext->boundParams);
/* generate simple base restrict info inside plannerRestrictionContext */
RelabelPlannerRestrictionContext(planContext->plannerRestrictionContext);
return CreateDistributedPlannedStmt(planContext);
@ -968,8 +972,8 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
/*
* ReplanAfterQueryModification replans modified originalquery to update plannercontext
* properly. Returns modified query.
* ReplanAfterQueryModification replans given query to update plannercontext
* accordingly. Returns modified query without chnaging given query.
*/
Query *
ReplanAfterQueryModification(Query *originalQuery, ParamListInfo boundParams)
@ -1847,6 +1851,46 @@ CheckNodeCopyAndSerialization(Node *node)
}
/*
* GenerateImplicitJoinRestrictInfoList generates implicit join restrict infos
* by using planner information. As we rely on join restriction infos at join
* planner, we generate implicit join restrict infos. When join condition contains
* a reducible constant, generate_join_implied_equalities does not generate implicit
* join clauses. Hence, we mark ec_has_const to false beforehand. At the end, we
* restore previous values.
*/
static List *
GenerateImplicitJoinRestrictInfoList(PlannerInfo *plannerInfo,
RelOptInfo *innerrel, RelOptInfo *outerrel)
{
List *generatedRestrictInfoList = NIL;
Relids joinrelids = bms_union(innerrel->relids, outerrel->relids);
List *prevVals = NIL;
EquivalenceClass *eqclass = NULL;
foreach_ptr(eqclass, plannerInfo->eq_classes)
{
prevVals = lappend_int(prevVals, eqclass->ec_has_const);
eqclass->ec_has_const = false;
}
generatedRestrictInfoList = generate_join_implied_equalities(
plannerInfo,
joinrelids,
outerrel->relids,
innerrel);
int i;
for (i = 0; i < list_length(prevVals); i++)
{
EquivalenceClass *eqClass = list_nth(plannerInfo->eq_classes, i);
eqClass->ec_has_const = list_nth_int(prevVals, i);
}
return generatedRestrictInfoList;
}
/*
* multi_join_restriction_hook is a hook called by postgresql standard planner
* to notify us about various planning information regarding joins. We use
@ -1899,26 +1943,8 @@ multi_join_restriction_hook(PlannerInfo *root,
joinRestriction->innerrelRelids = bms_copy(innerrel->relids);
joinRestriction->outerrelRelids = bms_copy(outerrel->relids);
Relids joinrelids = bms_union(innerrel->relids, outerrel->relids);
List *prevVals = NIL;
EquivalenceClass *eqclass = NULL;
foreach_ptr(eqclass, root->eq_classes)
{
prevVals = lappend_int(prevVals, eqclass->ec_has_const);
eqclass->ec_has_const = false;
}
joinRestrictionContext->generatedEcJoinRestrictInfoList =
generate_join_implied_equalities(
root,
joinrelids,
outerrel->relids,
innerrel);
int i;
for (i = 0; i < list_length(prevVals); i++)
{
EquivalenceClass *eqClass = list_nth(root->eq_classes, i);
eqClass->ec_has_const = list_nth_int(prevVals, i);
}
GenerateImplicitJoinRestrictInfoList(root, innerrel, outerrel);
joinRestrictionContext->joinRestrictionList =
lappend(joinRestrictionContext->joinRestrictionList, joinRestriction);

View File

@ -411,11 +411,23 @@ DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Node **distributionKe
Assert(columnInExpr);
bool distColumnExists = equal(distColumn, columnInExpr);
if (distColumnExists && constantClause != NULL &&
distColumn->vartype == constantClause->consttype &&
*distributionKeyValue == NULL)
{
/* if the vartypes do not match, let shard pruning handle it later */
*distributionKeyValue = (Node *) copyObject(constantClause);
if (distColumn->vartype == constantClause->consttype)
{
*distributionKeyValue = (Node *) copyObject(constantClause);
}
else
{
/* if the vartypes do not match, let shard pruning handle it later */
bool missingOk = true;
Const *transformedConstantClause =
TransformPartitionRestrictionValue(distColumn, constantClause, missingOk);
if (transformedConstantClause)
{
*distributionKeyValue = (Node *) copyObject(transformedConstantClause);
}
}
}
else if (paramClause != NULL)
{

View File

@ -364,7 +364,9 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
/*
* RelabelPlannerRestrictionContext relabels var nos inside restriction infos to 1.
* RelabelPlannerRestrictionContext relabels all Var varnos inside plannerRestrictionContext
* restriction infos to 1. If we have an unempty fastpath context, we manually create a single
* base RestrictInfo as we didnot call standard_planner to create it.
*/
void
RelabelPlannerRestrictionContext(PlannerRestrictionContext *plannerRestrictionContext)
@ -443,6 +445,7 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
Query *selectQuery = selectRte->subquery;
/* relabels all Var varnos inside plannerRestrictionContext after we modify query */
RelabelPlannerRestrictionContext(plannerRestrictionContext);
bool allowRecursivePlanning = true;

View File

@ -76,8 +76,8 @@ static List * FindJoinClauseForTables(List *joinRestrictInfoListList,
JoinType joinType);
static const char * JoinTypeName(JoinType jointype);
static List * ExtractPushdownJoinRestrictInfos(List *joinRestrictInfoList,
RestrictInfo *joinRestrictInfo, JoinType
joinType);
RestrictInfo *joinRestrictInfo,
JoinType joinType);
/* Local functions forward declarations for join evaluations */
static JoinOrderNode * EvaluateJoinRules(List *joinedTableList,
@ -373,7 +373,8 @@ TableEntryByRangeTableId(List *tableEntryList, uint32 rangeTableIdx)
*/
static List *
ExtractPushdownJoinRestrictInfos(List *restrictInfoListOfJoin,
RestrictInfo *joinRestrictInfo, JoinType joinType)
RestrictInfo *joinRestrictInfo,
JoinType joinType)
{
List *joinFilterRestrictInfoList = NIL;
@ -415,18 +416,19 @@ FindJoinClauseForTables(List *joinRestrictInfoListList, List *generatedEcJoinCla
rhsTableId,
restrictClause))
{
List *pushdownFakeRestrictInfoList = ExtractPushdownJoinRestrictInfos(
List *pushdownableJoinRestrictInfoList = ExtractPushdownJoinRestrictInfos(
joinRestrictInfoList, joinRestrictInfo, joinType);
List *nonPushdownRestrictInfoList = list_difference(joinRestrictInfoList,
pushdownFakeRestrictInfoList);
List *nonPushdownRestrictClauseList =
ExtractRestrictClausesFromRestrictionInfoList(
nonPushdownRestrictInfoList);
return nonPushdownRestrictClauseList;
List *nonPushdownableJoinRestrictInfoList = list_difference(
joinRestrictInfoList,
pushdownableJoinRestrictInfoList);
List *nonPushdownableJoinRestrictClauseList =
get_all_actual_clauses(nonPushdownableJoinRestrictInfoList);
return nonPushdownableJoinRestrictClauseList;
}
}
}
/* we can find applicable join clause inside generated implicit join clauses */
if (joinType == JOIN_INNER)
{
Node *ecClause = NULL;
@ -1292,7 +1294,7 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
/*
* we only allow single range redistribution if both tables are range distributed.
* The reason is that we cannot guarantee all values in nonrange distributed table
* The reason is that we cannot guarantee all values in non-range distributed table
* will be inside the shard ranges of range distributed table.
*/
if ((currentPartitionMethod == DISTRIBUTE_BY_RANGE && candidatePartitionMethod !=
@ -1582,8 +1584,8 @@ IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, Node *joinCla
/*
* IsApplicableFalseConstantJoinClause returns true if it can find a constant false filter
* which is applied to right table and also at least one of the table in left tables.
* IsApplicableFalseConstantJoinClause returns true if given restrictinfo is a constant false
* filter which is applied to right table and also at least one of the table in left tables.
*/
bool
IsApplicableFalseConstantJoinClause(List *leftTableIdList, uint32 rightTableId,

View File

@ -31,6 +31,7 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_restriction_equivalence.h"
#include "distributed/shard_pruning.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/query_utils.h"
#include "distributed/worker_protocol.h"
@ -596,13 +597,10 @@ MultiNodeTree(Query *queryTree, PlannerRestrictionContext *plannerRestrictionCon
plannerRestrictionContext->joinRestrictionContext->generatedEcJoinRestrictInfoList;
/* verify we can plan for restriction clauses */
List *baseClauseList = ExtractRestrictClausesFromRestrictionInfoList(
nonJoinRestrictionInfoList);
List *allJoinClauseList = ExtractRestrictClausesFromRestrictionInfoList(
joinRestrictInfoList);
List *pseudoClauseList = ExtractRestrictClausesFromRestrictionInfoList(
pseudoRestrictInfoList);
List *generatedEcJoinClauseList = ExtractRestrictClausesFromRestrictionInfoList(
List *baseClauseList = get_all_actual_clauses(nonJoinRestrictionInfoList);
List *allJoinClauseList = get_all_actual_clauses(joinRestrictInfoList);
List *pseudoClauseList = get_all_actual_clauses(pseudoRestrictInfoList);
List *generatedEcJoinClauseList = get_all_actual_clauses(
generatedEcJoinRestrictInfoList);
allJoinClauseList = list_concat_unique(allJoinClauseList, generatedEcJoinClauseList);
@ -729,13 +727,17 @@ MultiNodeTree(Query *queryTree, PlannerRestrictionContext *plannerRestrictionCon
* build select node if the query has selection criteria
* select node will have pushdownable and non-pushdownable parts.
* - all base clauses can be pushdownable
* - some of join clauses cannot be pushed down e.g. they can be only applied after join
* - some of join clauses cannot be pushed down and they can only be applied after join
* as join condition. Those should stay in MultiJoin.
* - some of join clauses can be pushed down. Those should be in nonpushdownable part of
* MultiSelect. ??? todo: can we also pushdown those to workers for optimization
*/
List *pushdownableSelectClauseList = baseClauseList;
List *nonpushdownableJoinClauseList = ExtractNonPushdownableJoinClauses(
joinOrderList);
List *nonPushdownableSelectClauseList = list_difference(allJoinClauseList,
nonpushdownableJoinClauseList);
List *pushdownableJoinClauseList = list_difference(allJoinClauseList,
nonpushdownableJoinClauseList);
List *nonPushdownableSelectClauseList = pushdownableJoinClauseList;
MultiSelect *selectNode = MultiSelectNode(pushdownableSelectClauseList,
nonPushdownableSelectClauseList);
if (selectNode != NULL)
@ -866,25 +868,6 @@ ExtractRestrictionInfosFromPlannerContext(
}
/*
* ExtractRestrictClausesFromRestrictionInfoList extracts RestrictInfo clauses from
* given restrictInfoList.
*/
List *
ExtractRestrictClausesFromRestrictionInfoList(List *restrictInfoList)
{
List *restrictClauseList = NIL;
RestrictInfo *restrictInfo = NULL;
foreach_ptr(restrictInfo, restrictInfoList)
{
restrictClauseList = lappend(restrictClauseList, restrictInfo->clause);
}
return restrictClauseList;
}
/*
* FetchJoinOrderContext returns all join info for given node.
*/

View File

@ -36,6 +36,7 @@
#include "distributed/query_pushdown_planning.h"
#include "distributed/recursive_planning.h"
#include "distributed/relation_restriction_equivalence.h"
#include "distributed/shard_pruning.h"
#include "distributed/version_compat.h"
#include "nodes/nodeFuncs.h"
#include "nodes/makefuncs.h"
@ -214,10 +215,6 @@ ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
{
return true;
}
/*
* todo: join order planner cannot handle lateral join trees for outer joins.
*/
}
/*
@ -230,10 +227,8 @@ ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
List *nonJoinRestrictionInfoList = restrictInfoContext->baseRestrictInfoList;
/* verify we can plan for restriction clauses */
List *whereClauseList = ExtractRestrictClausesFromRestrictionInfoList(
nonJoinRestrictionInfoList);
List *joinClauseList = ExtractRestrictClausesFromRestrictionInfoList(
joinRestrictionInfoList);
List *whereClauseList = get_all_actual_clauses(nonJoinRestrictionInfoList);
List *joinClauseList = get_all_actual_clauses(joinRestrictionInfoList);
List *qualClauseList = list_concat_copy(whereClauseList, joinClauseList);
if (DeferErrorIfUnsupportedClause(qualClauseList) != NULL)
{

View File

@ -31,7 +31,6 @@
typedef enum JoinRuleType
{
JOIN_RULE_INVALID_FIRST = 0,
REFERENCE_JOIN = 1,
LOCAL_PARTITION_JOIN = 2,
SINGLE_HASH_PARTITION_JOIN = 3,
@ -125,8 +124,8 @@ extern List * FixedJoinOrderList(List *rangeTableEntryList,
List *pseudoClauseList);
extern bool IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId,
Node *joinClause);
extern bool IsApplicableFalseConstantJoinClause(List *leftTableIdList, uint32
rightTableId,
extern bool IsApplicableFalseConstantJoinClause(List *leftTableIdList,
uint32 rightTableId,
RestrictInfo *restrictInfo);
extern bool NodeIsEqualsOpExpr(Node *node);
extern bool IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable,

View File

@ -190,7 +190,14 @@ typedef struct MultiExtendedOp
} MultiExtendedOp;
/* RestrictInfoContext stores join and base restriction infos extracted from planner context*/
/*
* RestrictInfoContext stores join and base restriction infos extracted from planner context
* baseRestrictInfoList: WHERE <>
* joinRestrictInfoList JOIN ON <>
* joinRestrictInfoListList: stores colocated join restrictions in 2 dimensional list
* generatedEcJoinClauseList: stores generated implicit join clauses
* pseudoRestrictInfoList: stores all pseudoconstant restrict infos
*/
typedef struct RestrictInfoContext
{
List *baseRestrictInfoList;
@ -233,7 +240,6 @@ extern DeferredErrorMessage * DeferErrorIfQueryNotSupported(Query *queryTree);
extern List * WhereClauseList(FromExpr *fromExpr);
extern RestrictInfoContext * ExtractRestrictionInfosFromPlannerContext(
PlannerRestrictionContext *plannerRestrictionContext);
extern List * ExtractRestrictClausesFromRestrictionInfoList(List *restrictInfoList);
extern List * TableEntryList(List *rangeTableList);
extern List * UsedTableEntryList(Query *query);
extern List * pull_var_clause_default(Node *node);