Apply @mtuncer's reviews

pull/1282/head
Onder Kalaci 2017-04-03 13:32:55 +03:00
parent 427af34719
commit 4c41096370
6 changed files with 172 additions and 140 deletions

View File

@ -32,7 +32,7 @@ static uint32 attributeEquivalenceId = 1;
* relation attributes (i.e., not random expressions), we create an * relation attributes (i.e., not random expressions), we create an
* AttributeEquivalenceClass to record this knowledge. If we later find another * AttributeEquivalenceClass to record this knowledge. If we later find another
* equivalence B = C, we create another AttributeEquivalenceClass. Finally, we can * equivalence B = C, we create another AttributeEquivalenceClass. Finally, we can
* apply transitity rules and generate a new AttributeEquivalenceClass which includes * apply transitivity rules and generate a new AttributeEquivalenceClass which includes
* A, B and C. * A, B and C.
* *
* Note that equality among the members are identified by the varattno and rteIdentity. * Note that equality among the members are identified by the varattno and rteIdentity.
@ -45,20 +45,20 @@ typedef struct AttributeEquivalenceClass
/* /*
* AttributeEquivalenceClassMember - one member expression of an * AttributeEquivalenceClassMember - one member expression of an
* AttributeEquivalenceClassMember. The important thing to consider is that * AttributeEquivalenceClass. The important thing to consider is that
* the class member contains "rteIndentity" field. Note that each RTE_RELATION * the class member contains "rteIndentity" field. Note that each RTE_RELATION
* is assigned a unique rteIdentity in AssignRTEIdentities() function. * is assigned a unique rteIdentity in AssignRTEIdentities() function.
* *
* "varno" and "varattrno" is directly used from a Var clause that is being added * "varno" and "varattno" is directly used from a Var clause that is being added
* to the attribute equivalence. Since we only use this class for relations, the member * to the attribute equivalence. Since we only use this class for relations, the member
* also includes the relation id field. * also includes the relation id field.
*/ */
typedef struct AttributeEquivalenceClassMember typedef struct AttributeEquivalenceClassMember
{ {
Oid relationId;
int rteIdentity;
Index varno; Index varno;
AttrNumber varattno; AttrNumber varattno;
Oid relationId;
int rteIdendity;
} AttributeEquivalenceClassMember; } AttributeEquivalenceClassMember;
@ -67,9 +67,9 @@ static List * GenerateAttributeEquivalencesForRelationRestrictions(
RelationRestrictionContext *restrictionContext); RelationRestrictionContext *restrictionContext);
static AttributeEquivalenceClass * AttributeEquivalenceClassForEquivalenceClass( static AttributeEquivalenceClass * AttributeEquivalenceClassForEquivalenceClass(
EquivalenceClass *plannerEqClass, RelationRestriction *relationRestriction); EquivalenceClass *plannerEqClass, RelationRestriction *relationRestriction);
static void AddToAttributeEquivalenceClass(PlannerInfo *root, Var *varToBeAdded, static void AddToAttributeEquivalenceClass(AttributeEquivalenceClass **
AttributeEquivalenceClass ** attributeEquivalanceClass,
attributeEquivalanceClass); PlannerInfo *root, Var *varToBeAdded);
static Var * GetVarFromAssignedParam(List *parentPlannerParamList, static Var * GetVarFromAssignedParam(List *parentPlannerParamList,
Param *plannerParam); Param *plannerParam);
static List * GenerateAttributeEquivalencesForJoinRestrictions(JoinRestrictionContext static List * GenerateAttributeEquivalencesForJoinRestrictions(JoinRestrictionContext
@ -78,9 +78,9 @@ static bool AttributeClassContainsAttributeClassMember(AttributeEquivalenceClass
inputMember, inputMember,
AttributeEquivalenceClass * AttributeEquivalenceClass *
attributeEquivalenceClass); attributeEquivalenceClass);
static List * AddAttributeClassToAttributeClassList(AttributeEquivalenceClass * static List * AddAttributeClassToAttributeClassList(List *attributeEquivalenceList,
attributeEquivalance, AttributeEquivalenceClass *
List *attributeEquivalenceList); attributeEquivalance);
static AttributeEquivalenceClass * GenerateCommonEquivalence(List * static AttributeEquivalenceClass * GenerateCommonEquivalence(List *
attributeEquivalenceList); attributeEquivalenceList);
static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass ** static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass **
@ -94,6 +94,12 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass
* RTE_RELATION follows the above rule, we can conclude that all RTE_RELATIONs are * RTE_RELATION follows the above rule, we can conclude that all RTE_RELATIONs are
* joined on their partition keys. * joined on their partition keys.
* *
* The function returns true if all relations are joined on their partition keys.
* Otherwise, the function returns false. Since reference tables do not have partition
* keys, we skip processing them. Also, if the query includes only a single non-reference
* distributed relation, the function returns true since it doesn't make sense to check
* for partition key equality in that case.
*
* In order to do that, we invented a new equivalence class namely: * In order to do that, we invented a new equivalence class namely:
* AttributeEquivalenceClass. In very simple words, a AttributeEquivalenceClass is * AttributeEquivalenceClass. In very simple words, a AttributeEquivalenceClass is
* identified by an unique id and consists of a list of AttributeEquivalenceMembers. * identified by an unique id and consists of a list of AttributeEquivalenceMembers.
@ -112,14 +118,15 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass
* to find as much as information that Postgres planner provides to extensions. For the * to find as much as information that Postgres planner provides to extensions. For the
* details of the usage, please see GenerateAttributeEquivalencesForRelationRestrictions() * details of the usage, please see GenerateAttributeEquivalencesForRelationRestrictions()
* and GenerateAttributeEquivalencesForJoinRestrictions() * and GenerateAttributeEquivalencesForJoinRestrictions()
*
* Finally, as the name of the function reveals, the function returns true if all relations
* are joined on their partition keys. Otherwise, the function returns false.
*/ */
bool bool
AllRelationsJoinedOnPartitionKey(RelationRestrictionContext *restrictionContext, AllRelationsJoinedOnPartitionKey(PlannerRestrictionContext *plannerRestrictionContext)
JoinRestrictionContext *joinRestrictionContext)
{ {
RelationRestrictionContext *restrictionContext =
plannerRestrictionContext->relationRestrictionContext;
JoinRestrictionContext *joinRestrictionContext =
plannerRestrictionContext->joinRestrictionContext;
List *relationRestrictionAttributeEquivalenceList = NIL; List *relationRestrictionAttributeEquivalenceList = NIL;
List *joinRestrictionAttributeEquivalenceList = NIL; List *joinRestrictionAttributeEquivalenceList = NIL;
List *allAttributeEquivalenceList = NIL; List *allAttributeEquivalenceList = NIL;
@ -127,7 +134,6 @@ AllRelationsJoinedOnPartitionKey(RelationRestrictionContext *restrictionContext,
uint32 referenceRelationCount = ReferenceRelationCount(restrictionContext); uint32 referenceRelationCount = ReferenceRelationCount(restrictionContext);
uint32 totalRelationCount = list_length(restrictionContext->relationRestrictionList); uint32 totalRelationCount = list_length(restrictionContext->relationRestrictionList);
uint32 nonReferenceRelationCount = totalRelationCount - referenceRelationCount; uint32 nonReferenceRelationCount = totalRelationCount - referenceRelationCount;
ListCell *commonEqClassCell = NULL; ListCell *commonEqClassCell = NULL;
ListCell *relationRestrictionCell = NULL; ListCell *relationRestrictionCell = NULL;
Relids commonRteIdentities = NULL; Relids commonRteIdentities = NULL;
@ -170,8 +176,9 @@ AllRelationsJoinedOnPartitionKey(RelationRestrictionContext *restrictionContext,
/* add the rte indexes of relations to a bitmap */ /* add the rte indexes of relations to a bitmap */
foreach(commonEqClassCell, commonEquivalenceClass->equivalentAttributes) foreach(commonEqClassCell, commonEquivalenceClass->equivalentAttributes)
{ {
AttributeEquivalenceClassMember *classMember = lfirst(commonEqClassCell); AttributeEquivalenceClassMember *classMember =
int rteIdentity = classMember->rteIdendity; (AttributeEquivalenceClassMember *) lfirst(commonEqClassCell);
int rteIdentity = classMember->rteIdentity;
commonRteIdentities = bms_add_member(commonRteIdentities, rteIdentity); commonRteIdentities = bms_add_member(commonRteIdentities, rteIdentity);
} }
@ -179,7 +186,8 @@ AllRelationsJoinedOnPartitionKey(RelationRestrictionContext *restrictionContext,
/* check whether all relations exists in the main restriction list */ /* check whether all relations exists in the main restriction list */
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
{ {
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(relationRestrictionCell);
int rteIdentity = GetRTEIdentity(relationRestriction->rte); int rteIdentity = GetRTEIdentity(relationRestriction->rte);
if (PartitionKey(relationRestriction->relationId) && if (PartitionKey(relationRestriction->relationId) &&
@ -205,7 +213,8 @@ ReferenceRelationCount(RelationRestrictionContext *restrictionContext)
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
{ {
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(relationRestrictionCell);
if (PartitionMethod(relationRestriction->relationId) == DISTRIBUTE_BY_NONE) if (PartitionMethod(relationRestriction->relationId) == DISTRIBUTE_BY_NONE)
{ {
@ -243,21 +252,23 @@ GenerateAttributeEquivalencesForRelationRestrictions(RelationRestrictionContext
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
{ {
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(relationRestrictionCell);
List *equivalenceClasses = relationRestriction->plannerInfo->eq_classes; List *equivalenceClasses = relationRestriction->plannerInfo->eq_classes;
ListCell *equivilanceClassCell = NULL; ListCell *equivalenceClassCell = NULL;
foreach(equivilanceClassCell, equivalenceClasses) foreach(equivalenceClassCell, equivalenceClasses)
{ {
EquivalenceClass *plannerEqClass = lfirst(equivilanceClassCell); EquivalenceClass *plannerEqClass =
(EquivalenceClass *) lfirst(equivalenceClassCell);
AttributeEquivalenceClass *attributeEquivalance = AttributeEquivalenceClass *attributeEquivalance =
AttributeEquivalenceClassForEquivalenceClass(plannerEqClass, AttributeEquivalenceClassForEquivalenceClass(plannerEqClass,
relationRestriction); relationRestriction);
attributeEquivalenceList = attributeEquivalenceList =
AddAttributeClassToAttributeClassList(attributeEquivalance, AddAttributeClassToAttributeClassList(attributeEquivalenceList,
attributeEquivalenceList); attributeEquivalance);
} }
} }
@ -288,7 +299,8 @@ AttributeEquivalenceClassForEquivalenceClass(EquivalenceClass *plannerEqClass,
foreach(equivilanceMemberCell, plannerEqClass->ec_members) foreach(equivilanceMemberCell, plannerEqClass->ec_members)
{ {
EquivalenceMember *equivalenceMember = lfirst(equivilanceMemberCell); EquivalenceMember *equivalenceMember =
(EquivalenceMember *) lfirst(equivilanceMemberCell);
Node *equivalenceNode = strip_implicit_coercions( Node *equivalenceNode = strip_implicit_coercions(
(Node *) equivalenceMember->em_expr); (Node *) equivalenceMember->em_expr);
Expr *strippedEquivalenceExpr = (Expr *) equivalenceNode; Expr *strippedEquivalenceExpr = (Expr *) equivalenceNode;
@ -304,17 +316,16 @@ AttributeEquivalenceClassForEquivalenceClass(EquivalenceClass *plannerEqClass,
equivalenceParam); equivalenceParam);
if (expressionVar) if (expressionVar)
{ {
AddToAttributeEquivalenceClass( AddToAttributeEquivalenceClass(&attributeEquivalance,
relationRestriction->parentPlannerInfo, relationRestriction->parentPlannerInfo,
expressionVar, expressionVar);
&attributeEquivalance);
} }
} }
else if (IsA(strippedEquivalenceExpr, Var)) else if (IsA(strippedEquivalenceExpr, Var))
{ {
expressionVar = (Var *) strippedEquivalenceExpr; expressionVar = (Var *) strippedEquivalenceExpr;
AddToAttributeEquivalenceClass(plannerInfo, expressionVar, AddToAttributeEquivalenceClass(&attributeEquivalance, plannerInfo,
&attributeEquivalance); expressionVar);
} }
} }
@ -368,7 +379,8 @@ GetVarFromAssignedParam(List *parentPlannerParamList, Param *plannerParam)
foreach(plannerParameterCell, parentPlannerParamList) foreach(plannerParameterCell, parentPlannerParamList)
{ {
PlannerParamItem *plannerParamItem = lfirst(plannerParameterCell); PlannerParamItem *plannerParamItem =
(PlannerParamItem *) lfirst(plannerParameterCell);
if (plannerParamItem->paramId != plannerParam->paramid) if (plannerParamItem->paramId != plannerParam->paramid)
{ {
@ -498,7 +510,8 @@ ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass **firstClass
foreach(equivalenceClassMemberCell, equivalenceMemberList) foreach(equivalenceClassMemberCell, equivalenceMemberList)
{ {
AttributeEquivalenceClassMember *newEqMember = lfirst(equivalenceClassMemberCell); AttributeEquivalenceClassMember *newEqMember =
(AttributeEquivalenceClassMember *) lfirst(equivalenceClassMemberCell);
if (AttributeClassContainsAttributeClassMember(newEqMember, *firstClass)) if (AttributeClassContainsAttributeClassMember(newEqMember, *firstClass))
{ {
@ -532,7 +545,8 @@ GenerateAttributeEquivalencesForJoinRestrictions(JoinRestrictionContext *
foreach(joinRestrictionCell, joinRestrictionContext->joinRestrictionList) foreach(joinRestrictionCell, joinRestrictionContext->joinRestrictionList)
{ {
JoinRestriction *joinRestriction = lfirst(joinRestrictionCell); JoinRestriction *joinRestriction =
(JoinRestriction *) lfirst(joinRestrictionCell);
ListCell *restrictionInfoList = NULL; ListCell *restrictionInfoList = NULL;
foreach(restrictionInfoList, joinRestriction->joinRestrictInfoList) foreach(restrictionInfoList, joinRestriction->joinRestrictInfoList)
@ -581,14 +595,15 @@ GenerateAttributeEquivalencesForJoinRestrictions(JoinRestrictionContext *
attributeEquivalance = palloc0(sizeof(AttributeEquivalenceClass)); attributeEquivalance = palloc0(sizeof(AttributeEquivalenceClass));
attributeEquivalance->equivalenceId = attributeEquivalenceId++; attributeEquivalance->equivalenceId = attributeEquivalenceId++;
AddToAttributeEquivalenceClass(joinRestriction->plannerInfo, leftVar, AddToAttributeEquivalenceClass(&attributeEquivalance,
&attributeEquivalance); joinRestriction->plannerInfo, leftVar);
AddToAttributeEquivalenceClass(joinRestriction->plannerInfo, rightVar,
&attributeEquivalance); AddToAttributeEquivalenceClass(&attributeEquivalance,
joinRestriction->plannerInfo, rightVar);
attributeEquivalenceList = attributeEquivalenceList =
AddAttributeClassToAttributeClassList(attributeEquivalance, AddAttributeClassToAttributeClassList(attributeEquivalenceList,
attributeEquivalenceList); attributeEquivalance);
} }
} }
@ -624,8 +639,8 @@ GenerateAttributeEquivalencesForJoinRestrictions(JoinRestrictionContext *
* This implies that there wouldn't be any columns for reference tables. * This implies that there wouldn't be any columns for reference tables.
*/ */
static void static void
AddToAttributeEquivalenceClass(PlannerInfo *root, Var *varToBeAdded, AddToAttributeEquivalenceClass(AttributeEquivalenceClass **attributeEquivalanceClass,
AttributeEquivalenceClass **attributeEquivalanceClass) PlannerInfo *root, Var *varToBeAdded)
{ {
RangeTblEntry *rangeTableEntry = root->simple_rte_array[varToBeAdded->varno]; RangeTblEntry *rangeTableEntry = root->simple_rte_array[varToBeAdded->varno];
@ -650,7 +665,7 @@ AddToAttributeEquivalenceClass(PlannerInfo *root, Var *varToBeAdded,
attributeEqMember->varattno = varToBeAdded->varattno; attributeEqMember->varattno = varToBeAdded->varattno;
attributeEqMember->varno = varToBeAdded->varno; attributeEqMember->varno = varToBeAdded->varno;
attributeEqMember->rteIdendity = GetRTEIdentity(rangeTableEntry); attributeEqMember->rteIdentity = GetRTEIdentity(rangeTableEntry);
attributeEqMember->relationId = rangeTableEntry->relid; attributeEqMember->relationId = rangeTableEntry->relid;
(*attributeEquivalanceClass)->equivalentAttributes = (*attributeEquivalanceClass)->equivalentAttributes =
@ -669,6 +684,12 @@ AddToAttributeEquivalenceClass(PlannerInfo *root, Var *varToBeAdded,
return; return;
} }
/* we also don't want to process ctid, tableoid etc */
if (varToBeAdded->varattno < InvalidAttrNumber)
{
return;
}
baseRelOptInfo = find_base_rel(root, varToBeAdded->varno); baseRelOptInfo = find_base_rel(root, varToBeAdded->varno);
/* If the subquery hasn't been planned yet, we have to punt */ /* If the subquery hasn't been planned yet, we have to punt */
@ -710,17 +731,17 @@ AddToAttributeEquivalenceClass(PlannerInfo *root, Var *varToBeAdded,
RangeTblRef *rightRangeTableReference = (RangeTblRef *) unionStatement->rarg; RangeTblRef *rightRangeTableReference = (RangeTblRef *) unionStatement->rarg;
varToBeAdded->varno = leftRangeTableReference->rtindex; varToBeAdded->varno = leftRangeTableReference->rtindex;
AddToAttributeEquivalenceClass(baseRelOptInfo->subroot, varToBeAdded, AddToAttributeEquivalenceClass(attributeEquivalanceClass,
attributeEquivalanceClass); baseRelOptInfo->subroot, varToBeAdded);
varToBeAdded->varno = rightRangeTableReference->rtindex; varToBeAdded->varno = rightRangeTableReference->rtindex;
AddToAttributeEquivalenceClass(baseRelOptInfo->subroot, varToBeAdded, AddToAttributeEquivalenceClass(attributeEquivalanceClass,
attributeEquivalanceClass); baseRelOptInfo->subroot, varToBeAdded);
} }
else if (varToBeAdded && IsA(varToBeAdded, Var) && varToBeAdded->varlevelsup == 0) else if (varToBeAdded && IsA(varToBeAdded, Var) && varToBeAdded->varlevelsup == 0)
{ {
AddToAttributeEquivalenceClass(baseRelOptInfo->subroot, varToBeAdded, AddToAttributeEquivalenceClass(attributeEquivalanceClass,
attributeEquivalanceClass); baseRelOptInfo->subroot, varToBeAdded);
} }
} }
} }
@ -739,8 +760,9 @@ AttributeClassContainsAttributeClassMember(AttributeEquivalenceClassMember *inpu
ListCell *classCell = NULL; ListCell *classCell = NULL;
foreach(classCell, attributeEquivalenceClass->equivalentAttributes) foreach(classCell, attributeEquivalenceClass->equivalentAttributes)
{ {
AttributeEquivalenceClassMember *memberOfClass = lfirst(classCell); AttributeEquivalenceClassMember *memberOfClass =
if (memberOfClass->rteIdendity == inputMember->rteIdendity && (AttributeEquivalenceClassMember *) lfirst(classCell);
if (memberOfClass->rteIdentity == inputMember->rteIdentity &&
memberOfClass->varattno == inputMember->varattno) memberOfClass->varattno == inputMember->varattno)
{ {
return true; return true;
@ -760,8 +782,8 @@ AttributeClassContainsAttributeClassMember(AttributeEquivalenceClassMember *inpu
* not contribute to our purposes, we skip such classed adding to the list. * not contribute to our purposes, we skip such classed adding to the list.
*/ */
static List * static List *
AddAttributeClassToAttributeClassList(AttributeEquivalenceClass *attributeEquivalance, AddAttributeClassToAttributeClassList(List *attributeEquivalenceList,
List *attributeEquivalenceList) AttributeEquivalenceClass *attributeEquivalance)
{ {
List *equivalentAttributes = NULL; List *equivalentAttributes = NULL;

View File

@ -32,8 +32,7 @@
#include "utils/memutils.h" #include "utils/memutils.h"
static List *relationRestrictionContextList = NIL; static List *plannerRestrictionContextList = NIL;
static List *joinRestrictionContextList = NIL;
/* create custom scan methods for separate executors */ /* create custom scan methods for separate executors */
static CustomScanMethods RealTimeCustomScanMethods = { static CustomScanMethods RealTimeCustomScanMethods = {
@ -60,8 +59,8 @@ static CustomScanMethods DelayedErrorCustomScanMethods = {
/* local function forward declarations */ /* local function forward declarations */
static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery,
Query *query, ParamListInfo boundParams, Query *query, ParamListInfo boundParams,
RelationRestrictionContext *restrictionContext, PlannerRestrictionContext *
JoinRestrictionContext *joinRestrictionContext); plannerRestrictionContext);
static void AssignRTEIdentities(Query *queryTree); static void AssignRTEIdentities(Query *queryTree);
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier); static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
static Node * SerializeMultiPlan(struct MultiPlan *multiPlan); static Node * SerializeMultiPlan(struct MultiPlan *multiPlan);
@ -72,10 +71,10 @@ static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *mu
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan); static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
static void CheckNodeIsDumpable(Node *node); static void CheckNodeIsDumpable(Node *node);
static List * CopyPlanParamList(List *originalPlanParamList); static List * CopyPlanParamList(List *originalPlanParamList);
static void CreateAndPushPlannerContexts(void); static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void);
static RelationRestrictionContext * CurrentRestrictionContext(void); static RelationRestrictionContext * CurrentRelationRestrictionContext(void);
static JoinRestrictionContext * CurrentJoinRestrictionContext(void); static JoinRestrictionContext * CurrentJoinRestrictionContext(void);
static void PopRestrictionContexts(void); static void PopPlannerRestrictionContext(void);
static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams); static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams);
@ -86,8 +85,7 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
PlannedStmt *result = NULL; PlannedStmt *result = NULL;
bool needsDistributedPlanning = NeedsDistributedPlanning(parse); bool needsDistributedPlanning = NeedsDistributedPlanning(parse);
Query *originalQuery = NULL; Query *originalQuery = NULL;
RelationRestrictionContext *relationRestrictionContext = NULL; PlannerRestrictionContext *plannerRestrictionContext = NULL;
JoinRestrictionContext *joinRestrictionContext = NULL;
/* /*
* standard_planner scribbles on it's input, but for deparsing we need the * standard_planner scribbles on it's input, but for deparsing we need the
@ -101,10 +99,7 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
} }
/* create a restriction context and put it at the end if context list */ /* create a restriction context and put it at the end if context list */
CreateAndPushPlannerContexts(); plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
relationRestrictionContext = CurrentRestrictionContext();
joinRestrictionContext = CurrentJoinRestrictionContext();
PG_TRY(); PG_TRY();
{ {
@ -118,19 +113,18 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
if (needsDistributedPlanning) if (needsDistributedPlanning)
{ {
result = CreateDistributedPlan(result, originalQuery, parse, result = CreateDistributedPlan(result, originalQuery, parse,
boundParams, relationRestrictionContext, boundParams, plannerRestrictionContext);
joinRestrictionContext);
} }
} }
PG_CATCH(); PG_CATCH();
{ {
PopRestrictionContexts(); PopPlannerRestrictionContext();
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
/* remove the context from the context list */ /* remove the context from the context list */
PopRestrictionContexts(); PopPlannerRestrictionContext();
return result; return result;
} }
@ -187,6 +181,18 @@ AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier)
} }
/* GetRTEIdentity returns the identity assigned with AssignRTEIdentity. */
int
GetRTEIdentity(RangeTblEntry *rte)
{
Assert(rte->rtekind == RTE_RELATION);
Assert(IsA(rte->values_lists, IntList));
Assert(list_length(rte->values_lists) == 1);
return linitial_int(rte->values_lists);
}
/* /*
* IsModifyCommand returns true if the query performs modifications, false * IsModifyCommand returns true if the query performs modifications, false
* otherwise. * otherwise.
@ -232,8 +238,7 @@ IsModifyMultiPlan(MultiPlan *multiPlan)
static PlannedStmt * static PlannedStmt *
CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query, CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query,
ParamListInfo boundParams, ParamListInfo boundParams,
RelationRestrictionContext *restrictionContext, PlannerRestrictionContext *plannerRestrictionContext)
JoinRestrictionContext *joinRestrictionContext)
{ {
MultiPlan *distributedPlan = NULL; MultiPlan *distributedPlan = NULL;
PlannedStmt *resultPlan = NULL; PlannedStmt *resultPlan = NULL;
@ -247,8 +252,8 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
if (IsModifyCommand(query)) if (IsModifyCommand(query))
{ {
/* modifications are always routed through the same planner/executor */ /* modifications are always routed through the same planner/executor */
distributedPlan = CreateModifyPlan(originalQuery, query, restrictionContext, distributedPlan =
joinRestrictionContext); CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
Assert(distributedPlan); Assert(distributedPlan);
} }
@ -262,7 +267,11 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
*/ */
if (EnableRouterExecution) if (EnableRouterExecution)
{ {
distributedPlan = CreateRouterPlan(originalQuery, query, restrictionContext); RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
distributedPlan = CreateRouterPlan(originalQuery, query,
relationRestrictionContext);
/* for debugging it's useful to display why query was not router plannable */ /* for debugging it's useful to display why query was not router plannable */
if (distributedPlan && distributedPlan->planningError) if (distributedPlan && distributedPlan->planningError)
@ -639,7 +648,6 @@ multi_join_restriction_hook(PlannerInfo *root,
joinRestriction->joinRestrictInfoList = restrictInfoList; joinRestriction->joinRestrictInfoList = restrictInfoList;
joinRestriction->plannerInfo = root; joinRestriction->plannerInfo = root;
joinContext->joinRestrictionList = joinContext->joinRestrictionList =
lappend(joinContext->joinRestrictionList, joinRestriction); lappend(joinContext->joinRestrictionList, joinRestriction);
} }
@ -668,7 +676,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index
distributedTable = IsDistributedTable(rte->relid); distributedTable = IsDistributedTable(rte->relid);
localTable = !distributedTable; localTable = !distributedTable;
restrictionContext = CurrentRestrictionContext(); restrictionContext = CurrentRelationRestrictionContext();
Assert(restrictionContext != NULL); Assert(restrictionContext != NULL);
relationRestriction = palloc0(sizeof(RelationRestriction)); relationRestriction = palloc0(sizeof(RelationRestriction));
@ -708,18 +716,6 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index
} }
/* GetRTEIdentity returns the identity assigned with AssignRTEIdentity. */
int
GetRTEIdentity(RangeTblEntry *rte)
{
Assert(rte->rtekind == RTE_RELATION);
Assert(IsA(rte->values_lists, IntList));
Assert(list_length(rte->values_lists) == 1);
return linitial_int(rte->values_lists);
}
/* /*
* CopyPlanParamList deep copies the input PlannerParamItem list and returns the newly * CopyPlanParamList deep copies the input PlannerParamItem list and returns the newly
* allocated list. * allocated list.
@ -748,43 +744,50 @@ CopyPlanParamList(List *originalPlanParamList)
/* /*
* CreateAndPushPlannerContextes creates a new restriction context and a new join context, * CreateAndPushPlannerRestrictionContext creates a new relation restriction context
* inserts it to the beginning of the respective context lists. * and a new join context, inserts it to the beginning of the
* plannerRestrictionContextList.
*/ */
static void static PlannerRestrictionContext *
CreateAndPushPlannerContexts(void) CreateAndPushPlannerRestrictionContext(void)
{ {
RelationRestrictionContext *restrictionContext = PlannerRestrictionContext *plannerRestrictionContext =
palloc0(sizeof(PlannerRestrictionContext));
plannerRestrictionContext->relationRestrictionContext =
palloc0(sizeof(RelationRestrictionContext)); palloc0(sizeof(RelationRestrictionContext));
JoinRestrictionContext *joinContext = plannerRestrictionContext->joinRestrictionContext =
palloc0(sizeof(JoinRestrictionContext)); palloc0(sizeof(JoinRestrictionContext));
/* we'll apply logical AND as we add tables */ /* we'll apply logical AND as we add tables */
restrictionContext->allReferenceTables = true; plannerRestrictionContext->relationRestrictionContext->allReferenceTables = true;
relationRestrictionContextList = lcons(restrictionContext, plannerRestrictionContextList = lcons(plannerRestrictionContext,
relationRestrictionContextList); plannerRestrictionContextList);
joinRestrictionContextList = lcons(joinContext,
joinRestrictionContextList); return plannerRestrictionContext;
} }
/* /*
* CurrentRestrictionContext returns the the last restriction context from the * CurrentRestrictionContext returns the the last restriction context from the
* list. * relationRestrictionContext list.
*/ */
static RelationRestrictionContext * static RelationRestrictionContext *
CurrentRestrictionContext(void) CurrentRelationRestrictionContext(void)
{ {
RelationRestrictionContext *restrictionContext = NULL; PlannerRestrictionContext *plannerRestrictionContext = NULL;
RelationRestrictionContext *relationRestrictionContext = NULL;
Assert(relationRestrictionContextList != NIL); Assert(plannerRestrictionContextList != NIL);
restrictionContext = plannerRestrictionContext =
(RelationRestrictionContext *) linitial(relationRestrictionContextList); (PlannerRestrictionContext *) linitial(plannerRestrictionContextList);
return restrictionContext; relationRestrictionContext = plannerRestrictionContext->relationRestrictionContext;
return relationRestrictionContext;
} }
@ -795,25 +798,28 @@ CurrentRestrictionContext(void)
static JoinRestrictionContext * static JoinRestrictionContext *
CurrentJoinRestrictionContext(void) CurrentJoinRestrictionContext(void)
{ {
JoinRestrictionContext *joinContext = NULL; PlannerRestrictionContext *plannerRestrictionContext = NULL;
JoinRestrictionContext *joinRestrictionContext = NULL;
Assert(joinRestrictionContextList != NIL); Assert(plannerRestrictionContextList != NIL);
joinContext = (JoinRestrictionContext *) linitial(joinRestrictionContextList); plannerRestrictionContext =
(PlannerRestrictionContext *) linitial(plannerRestrictionContextList);
return joinContext; joinRestrictionContext = plannerRestrictionContext->joinRestrictionContext;
return joinRestrictionContext;
} }
/* /*
* PopRestrictionContexts removes the most recently added restriction contexts from * PopPlannerRestrictionContext removes the most recently added restriction contexts from
* the restriction and join context lists. The function assumes the lists are not empty. * the planner restriction context list. The function assumes the list is not empty.
*/ */
static void static void
PopRestrictionContexts(void) PopPlannerRestrictionContext(void)
{ {
relationRestrictionContextList = list_delete_first(relationRestrictionContextList); plannerRestrictionContextList = list_delete_first(plannerRestrictionContextList);
joinRestrictionContextList = list_delete_first(joinRestrictionContextList);
} }

View File

@ -85,11 +85,8 @@ static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery,
RelationRestrictionContext * RelationRestrictionContext *
restrictionContext); restrictionContext);
static MultiPlan * CreateInsertSelectRouterPlan(Query *originalQuery, static MultiPlan * CreateInsertSelectRouterPlan(Query *originalQuery,
RelationRestrictionContext * PlannerRestrictionContext *
restrictionContext, plannerRestrictionContext);
JoinRestrictionContext *
joinRestrictionContext);
static Task * RouterModifyTaskForShardInterval(Query *originalQuery, static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
ShardInterval *shardInterval, ShardInterval *shardInterval,
RelationRestrictionContext * RelationRestrictionContext *
@ -171,18 +168,19 @@ CreateRouterPlan(Query *originalQuery, Query *query,
*/ */
MultiPlan * MultiPlan *
CreateModifyPlan(Query *originalQuery, Query *query, CreateModifyPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext, PlannerRestrictionContext *plannerRestrictionContext)
JoinRestrictionContext *joinRestrictionContext)
{ {
if (InsertSelectQuery(originalQuery)) if (InsertSelectQuery(originalQuery))
{ {
return CreateInsertSelectRouterPlan(originalQuery, restrictionContext, return CreateInsertSelectRouterPlan(originalQuery, plannerRestrictionContext);
joinRestrictionContext);
} }
else else
{ {
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
return CreateSingleTaskRouterPlan(originalQuery, query, return CreateSingleTaskRouterPlan(originalQuery, query,
restrictionContext); relationRestrictionContext);
} }
} }
@ -266,8 +264,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
*/ */
static MultiPlan * static MultiPlan *
CreateInsertSelectRouterPlan(Query *originalQuery, CreateInsertSelectRouterPlan(Query *originalQuery,
RelationRestrictionContext *restrictionContext, PlannerRestrictionContext *plannerRestrictionContext)
JoinRestrictionContext *joinRestrictionContext)
{ {
int shardOffset = 0; int shardOffset = 0;
List *sqlTaskList = NIL; List *sqlTaskList = NIL;
@ -280,7 +277,9 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
Oid targetRelationId = insertRte->relid; Oid targetRelationId = insertRte->relid;
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
int shardCount = targetCacheEntry->shardIntervalArrayLength; int shardCount = targetCacheEntry->shardIntervalArrayLength;
bool allReferenceTables = restrictionContext->allReferenceTables; RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
bool allReferenceTables = relationRestrictionContext->allReferenceTables;
bool allRelationsJoinedOnPartitionKey = false; bool allRelationsJoinedOnPartitionKey = false;
multiPlan->operation = originalQuery->commandType; multiPlan->operation = originalQuery->commandType;
@ -298,7 +297,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
} }
allRelationsJoinedOnPartitionKey = allRelationsJoinedOnPartitionKey =
AllRelationsJoinedOnPartitionKey(restrictionContext, joinRestrictionContext); AllRelationsJoinedOnPartitionKey(plannerRestrictionContext);
/* /*
* Plan select query for each shard in the target table. Do so by replacing the * Plan select query for each shard in the target table. Do so by replacing the
@ -316,7 +315,8 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
Task *modifyTask = NULL; Task *modifyTask = NULL;
modifyTask = RouterModifyTaskForShardInterval(originalQuery, targetShardInterval, modifyTask = RouterModifyTaskForShardInterval(originalQuery, targetShardInterval,
restrictionContext, taskIdIndex, relationRestrictionContext,
taskIdIndex,
allRelationsJoinedOnPartitionKey); allRelationsJoinedOnPartitionKey);
/* add the task if it could be created */ /* add the task if it could be created */

View File

@ -15,10 +15,8 @@
#include "distributed/multi_planner.h" #include "distributed/multi_planner.h"
extern bool AllRelationsJoinedOnPartitionKey(RelationRestrictionContext * extern bool AllRelationsJoinedOnPartitionKey(PlannerRestrictionContext *
restrictionContext, plannerRestrictionContext);
JoinRestrictionContext *
joinRestrictionContext);
#endif /* MULTI_COLOCATED_SUBQUERY_PLANNER_H */ #endif /* MULTI_COLOCATED_SUBQUERY_PLANNER_H */

View File

@ -54,6 +54,12 @@ typedef struct JoinRestriction
PlannerInfo *plannerInfo; PlannerInfo *plannerInfo;
} JoinRestriction; } JoinRestriction;
typedef struct PlannerRestrictionContext
{
RelationRestrictionContext *relationRestrictionContext;
JoinRestrictionContext *joinRestrictionContext;
} PlannerRestrictionContext;
typedef struct RelationShard typedef struct RelationShard
{ {
CitusNode type; CitusNode type;

View File

@ -29,8 +29,8 @@ extern bool EnableRouterExecution;
extern MultiPlan * CreateRouterPlan(Query *originalQuery, Query *query, extern MultiPlan * CreateRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext); RelationRestrictionContext *restrictionContext);
extern MultiPlan * CreateModifyPlan(Query *originalQuery, Query *query, extern MultiPlan * CreateModifyPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext, PlannerRestrictionContext *
JoinRestrictionContext *joinRestrictionContext); plannerRestrictionContext);
extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree); extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree);
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,