union_all_join
Onder Kalaci 2021-02-15 18:06:36 +01:00
parent 0f1ce7a913
commit 6c903cd6f7
1 changed files with 85 additions and 2 deletions

View File

@ -110,6 +110,7 @@ struct RecursivePlanningContextInternal
int level;
uint64 planId;
bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */
bool setOperationJoinWithAnyRTE;
List *subPlanList;
PlannerRestrictionContext *plannerRestrictionContext;
};
@ -141,6 +142,7 @@ typedef struct VarLevelsUpWalkerContext
static DeferredErrorMessage * RecursivelyPlanSubqueriesAndCTEs(Query *query,
RecursivePlanningContext *
context);
static bool ContainsSetOperationJoinRTE(JoinRestrictionContext *joinRestrictionContext);
static bool ShouldRecursivelyPlanNonColocatedSubqueries(Query *subquery,
RecursivePlanningContext *
context);
@ -220,6 +222,10 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
context.subPlanList = NIL;
context.plannerRestrictionContext = plannerRestrictionContext;
context.setOperationJoinWithAnyRTE =
ContainsSetOperationJoinRTE(plannerRestrictionContext->joinRestrictionContext);
/*
* Calculating the distribution key equality upfront is a trade-off for us.
*
@ -232,8 +238,12 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
* calculating this wouldn't help us at all, we should individually check
* each each subquery and subquery joins among subqueries.
*/
context.allDistributionKeysInQueryAreEqual =
AllDistributionKeysInQueryAreEqual(originalQuery, plannerRestrictionContext);
if (!context.setOperationJoinWithAnyRTE)
{
context.allDistributionKeysInQueryAreEqual =
AllDistributionKeysInQueryAreEqual(originalQuery, plannerRestrictionContext);
}
DeferredErrorMessage *error = RecursivelyPlanSubqueriesAndCTEs(originalQuery,
&context);
@ -259,6 +269,74 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
}
static bool
ContainsSetOperationJoinRTE(JoinRestrictionContext *joinRestrictionContext)
{
List *attributeEquivalenceList = NIL;
ListCell *joinRestrictionCell = NULL;
if (joinRestrictionContext == NULL)
{
return false;
}
foreach(joinRestrictionCell, joinRestrictionContext->joinRestrictionList)
{
JoinRestriction *joinRestriction =
(JoinRestriction *) lfirst(joinRestrictionCell);
ListCell *restrictionInfoList = NULL;
foreach(restrictionInfoList, joinRestriction->joinRestrictInfoList)
{
RestrictInfo *rinfo = (RestrictInfo *) lfirst(restrictionInfoList);
Expr *restrictionClause = rinfo->clause;
if (!IsA(restrictionClause, OpExpr))
{
continue;
}
OpExpr *restrictionOpExpr = (OpExpr *) restrictionClause;
if (list_length(restrictionOpExpr->args) != 2)
{
continue;
}
if (!OperatorImplementsEquality(restrictionOpExpr->opno))
{
continue;
}
Node *leftNode = linitial(restrictionOpExpr->args);
Node *rightNode = lsecond(restrictionOpExpr->args);
/* we also don't want implicit coercions */
Expr *strippedLeftExpr = (Expr *) strip_implicit_coercions((Node *) leftNode);
Expr *strippedRightExpr =
(Expr *) strip_implicit_coercions((Node *) rightNode);
if (!(IsA(strippedLeftExpr, Var) && IsA(strippedRightExpr, Var)))
{
continue;
}
PlannerInfo *root = joinRestriction->plannerInfo;
Var *leftVar = (Var *) strippedLeftExpr;
RangeTblEntry *leftRangeTblEntry = root->simple_rte_array[leftVar->varno];
Var *rightVar = (Var *) strippedRightExpr;
RangeTblEntry *rightRangeTblEntry = root->simple_rte_array[rightVar->varno];
if (leftRangeTblEntry->inh || rightRangeTblEntry->inh)
{
return true;
}
}
}
return false;
}
/*
* RecursivelyPlanSubqueriesAndCTEs finds subqueries and CTEs that cannot be pushed down to
* workers directly and instead plans them by recursively calling the planner and
@ -1010,6 +1088,11 @@ ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *contex
return true;
}
if (context->setOperationJoinWithAnyRTE)
{
return true;
}
if (DeferErrorIfUnsupportedUnionQuery(query) != NULL)
{
/*