From 5b0baea72cab9e129c9448a658f9327cadbe2f2d Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 27 Dec 2019 13:47:49 +0100 Subject: [PATCH] Refactor distributed_planner for better understandability --- .../distributed/planner/distributed_planner.c | 247 ++++++++++-------- .../planner/function_call_delegation.c | 32 ++- .../planner/multi_router_planner.c | 50 ++-- src/include/distributed/distributed_planner.h | 30 +++ .../distributed/function_call_delegation.h | 4 +- 5 files changed, 201 insertions(+), 162 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 7d3752e96..4aef0b89c 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -66,14 +66,10 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; - static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool IsUpdateOrDelete(Query *query); -static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, - Query *originalQuery, Query *query, - ParamListInfo boundParams, - PlannerRestrictionContext * - plannerRestrictionContext); +static PlannedStmt * CreateDistributedPlannedStmt( + DistributedPlanningContext *planContext); static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo boundParams, bool hasUnresolvedParams, @@ -88,8 +84,6 @@ static int AssignRTEIdentities(List *rangeTableList, int rteIdCounter); static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier); static void AdjustPartitioningForDistributedPlanning(List *rangeTableList, bool setPartitionedTablesInherited); -static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, - DistributedPlan *distributedPlan); static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan, CustomScan *customScan); @@ -117,6 +111,10 @@ static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boun static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); static bool QueryIsNotSimpleSelect(Node *node); static bool UpdateReferenceTablesWithShard(Node *node, void *context); +static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, + Const *distributionKeyValue); +static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, + List *rangeTableList, int rteIdCounter); /* Distributed planner hook */ PlannedStmt * @@ -124,12 +122,16 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { PlannedStmt *result = NULL; bool needsDistributedPlanning = false; - Query *originalQuery = NULL; bool setPartitionedTablesInherited = false; List *rangeTableList = ExtractRangeTableEntryList(parse); int rteIdCounter = 1; bool fastPathRouterQuery = false; Const *distributionKeyValue = NULL; + DistributedPlanningContext planContext = { + .query = parse, + .cursorOptions = cursorOptions, + .boundParams = boundParams, + }; if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) { @@ -160,7 +162,17 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } } - if (needsDistributedPlanning) + if (fastPathRouterQuery) + { + /* + * We need to copy the parse tree because the FastPathPlanner modifies + * it. In the next branch we do the same for other distributed queries + * too, but for those it needs to be done AFTER calling + * AssignRTEIdentities. + */ + planContext.originalQuery = copyObject(parse); + } + else if (needsDistributedPlanning) { /* * Inserting into a local table needs to go through the regular postgres @@ -168,7 +180,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * don't have a way of doing both things and therefore error out, but do * have a handy tip for users. */ - if (!fastPathRouterQuery && InsertSelectIntoLocalTable(parse)) + if (InsertSelectIntoLocalTable(parse)) { ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a " "local table"), @@ -179,31 +191,16 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) /* * standard_planner scribbles on it's input, but for deparsing we need the - * unmodified form. Note that we keep RTE_RELATIONs with their identities - * set, which doesn't break our goals, but, prevents us keeping an extra copy - * of the query tree. Note that we copy the query tree once we're sure it's a - * distributed query. - * - * Since fast-path queries do not through standard planner, we skip unnecessary - * parts in that case. + * unmodified form. Note that before copying we call + * AssignRTEIdentities, which is needed because these identities need + * to be present in the copied query too. */ - if (!fastPathRouterQuery) - { - rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); - originalQuery = copyObject(parse); + rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); + planContext.originalQuery = copyObject(parse); - setPartitionedTablesInherited = false; - AdjustPartitioningForDistributedPlanning(rangeTableList, - setPartitionedTablesInherited); - } - else - { - /* - * We still need to copy the parse tree because the FastPathPlanner - * modifies it. - */ - originalQuery = copyObject(parse); - } + setPartitionedTablesInherited = false; + AdjustPartitioningForDistributedPlanning(rangeTableList, + setPartitionedTablesInherited); } /* @@ -213,83 +210,42 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) ReplaceTableVisibleFunction((Node *) parse); /* create a restriction context and put it at the end if context list */ - PlannerRestrictionContext *plannerRestrictionContext = - CreateAndPushPlannerRestrictionContext(); + planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); + + /* + * We keep track of how many times we've recursed into the planner, primarily + * to detect whether we are in a function call. We need to make sure that the + * PlannerLevel is decremented exactly once at the end of the next PG_TRY + * block, both in the happy case and when an error occurs. + */ + PlannerLevel++; + PG_TRY(); { - /* - * We keep track of how many times we've recursed into the planner, primarily - * to detect whether we are in a function call. We need to make sure that the - * PlannerLevel is decremented exactly once at the end of this PG_TRY block, - * both in the happy case and when an error occurs. - */ - PlannerLevel++; - - /* - * For trivial queries, we're skipping the standard_planner() in - * order to eliminate its overhead. - * - * Otherwise, call into standard planner. This is required because the Citus - * planner relies on both the restriction information per table and parse tree - * transformations made by postgres' planner. - */ - - if (needsDistributedPlanning && fastPathRouterQuery) + if (fastPathRouterQuery) { - plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery = - true; - plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue = - distributionKeyValue; - - result = FastPathPlanner(originalQuery, parse, boundParams); + result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue); } else { - result = standard_planner(parse, cursorOptions, boundParams); - + /* + * Call into standard_planner because the Citus planner relies on both the + * restriction information per table and parse tree transformations made by + * postgres' planner. + */ + planContext.plan = standard_planner(planContext.query, + planContext.cursorOptions, + planContext.boundParams); if (needsDistributedPlanning) { - /* may've inlined new relation rtes */ - rangeTableList = ExtractRangeTableEntryList(parse); - rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); + result = PlanDistributedStmt(&planContext, rangeTableList, rteIdCounter); + } + else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL) + { + result = planContext.plan; } } - - if (needsDistributedPlanning) - { - uint64 planId = NextPlanId++; - - result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse, - boundParams, plannerRestrictionContext); - - if (!fastPathRouterQuery) - { - setPartitionedTablesInherited = true; - AdjustPartitioningForDistributedPlanning(rangeTableList, - setPartitionedTablesInherited); - } - } - else - { - bool hasExternParam = false; - DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse, - &hasExternParam); - if (delegatePlan != NULL) - { - result = FinalizePlan(result, delegatePlan); - } - else if (hasExternParam) - { - /* - * As in CreateDistributedPlannedStmt, try dissuade planner when planning - * potentially failed due to unresolved prepared statement parameters. - */ - result->planTree->total_cost = FLT_MAX / 100000000; - } - } - - PlannerLevel--; } PG_CATCH(); { @@ -301,6 +257,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } PG_END_TRY(); + PlannerLevel--; /* remove the context from the context list */ PopPlannerRestrictionContext(); @@ -578,30 +535,91 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan) } +/* + * PlanFastPathDistributedStmt creates a distributed planned statement using + * the FastPathPlanner. + */ +static PlannedStmt * +PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, + Const *distributionKeyValue) +{ + planContext->plannerRestrictionContext->fastPathRestrictionContext-> + fastPathRouterQuery = true; + planContext->plannerRestrictionContext->fastPathRestrictionContext-> + distributionKeyValue = distributionKeyValue; + + planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, + planContext->boundParams); + + return CreateDistributedPlannedStmt(planContext); +} + + +/* + * PlanDistributedStmt creates a distributed planned statement using the PG + * planner. + */ +static PlannedStmt * +PlanDistributedStmt(DistributedPlanningContext *planContext, + List *rangeTableList, + int rteIdCounter) +{ + /* may've inlined new relation rtes */ + rangeTableList = ExtractRangeTableEntryList(planContext->query); + rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); + + + PlannedStmt *result = CreateDistributedPlannedStmt(planContext); + + bool setPartitionedTablesInherited = true; + AdjustPartitioningForDistributedPlanning(rangeTableList, + setPartitionedTablesInherited); + + return result; +} + + +/* + * DissuadePlannerFromUsingPlan try dissuade planner when planning a plan that + * potentially failed due to unresolved prepared statement parameters. + */ +void +DissuadePlannerFromUsingPlan(PlannedStmt *plan) +{ + /* + * Arbitrarily high cost, but low enough that it can be added up + * without overflowing by choose_custom_plan(). + */ + plan->planTree->total_cost = FLT_MAX / 100000000; +} + + /* * CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular * query into a distributed plan that is encapsulated by a PlannedStmt. */ static PlannedStmt * -CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, - Query *query, ParamListInfo boundParams, - PlannerRestrictionContext *plannerRestrictionContext) +CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) { + uint64 planId = NextPlanId++; bool hasUnresolvedParams = false; JoinRestrictionContext *joinRestrictionContext = - plannerRestrictionContext->joinRestrictionContext; + planContext->plannerRestrictionContext->joinRestrictionContext; - if (HasUnresolvedExternParamsWalker((Node *) originalQuery, boundParams)) + if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery, + planContext->boundParams)) { hasUnresolvedParams = true; } - plannerRestrictionContext->joinRestrictionContext = + planContext->plannerRestrictionContext->joinRestrictionContext = RemoveDuplicateJoinRestrictions(joinRestrictionContext); DistributedPlan *distributedPlan = - CreateDistributedPlan(planId, originalQuery, query, boundParams, - hasUnresolvedParams, plannerRestrictionContext); + CreateDistributedPlan(planId, planContext->originalQuery, planContext->query, + planContext->boundParams, + hasUnresolvedParams, + planContext->plannerRestrictionContext); /* * If no plan was generated, prepare a generic error to be emitted. @@ -646,7 +664,7 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi distributedPlan->planId = planId; /* create final plan by combining local plan with distributed plan */ - PlannedStmt *resultPlan = FinalizePlan(localPlan, distributedPlan); + PlannedStmt *resultPlan = FinalizePlan(planContext->plan, distributedPlan); /* * As explained above, force planning costs to be unrealistically high if @@ -654,14 +672,11 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi * if it is planned as a multi shard modify query. */ if ((distributedPlan->planningError || - (IsUpdateOrDelete(originalQuery) && IsMultiTaskPlan(distributedPlan))) && + (IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan( + distributedPlan))) && hasUnresolvedParams) { - /* - * Arbitrarily high cost, but low enough that it can be added up - * without overflowing by choose_custom_plan(). - */ - resultPlan->planTree->total_cost = FLT_MAX / 100000000; + DissuadePlannerFromUsingPlan(resultPlan); } return resultPlan; @@ -1099,7 +1114,7 @@ GetDistributedPlan(CustomScan *customScan) * FinalizePlan combines local plan with distributed plan and creates a plan * which can be run by the PostgreSQL executor. */ -static PlannedStmt * +PlannedStmt * FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) { PlannedStmt *finalPlan = NULL; diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 8201abb32..0a4c5a8da 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -95,8 +95,8 @@ contain_param_walker(Node *node, void *context) * forms involving multiple function calls, FROM clauses, WHERE clauses, * ... Those complex forms are handled in the coordinator. */ -DistributedPlan * -TryToDelegateFunctionCall(Query *query, bool *hasExternParam) +PlannedStmt * +TryToDelegateFunctionCall(DistributedPlanningContext *planContext) { List *targetList = NIL; TargetEntry *targetEntry = NULL; @@ -114,12 +114,9 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) StringInfo queryString = NULL; Task *task = NULL; Job *job = NULL; - DistributedPlan *distributedPlan = NULL; + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); struct ParamWalkerContext walkerParamContext = { 0 }; - /* set hasExternParam now in case of early exit */ - *hasExternParam = false; - if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4)) { /* Citus is not ready to determine whether function is distributed */ @@ -133,19 +130,19 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) return NULL; } - if (query == NULL) + if (planContext->query == NULL) { /* no query (mostly here to be defensive) */ return NULL; } - if (query->commandType != CMD_SELECT) + if (planContext->query->commandType != CMD_SELECT) { /* not a SELECT */ return NULL; } - FromExpr *joinTree = query->jointree; + FromExpr *joinTree = planContext->query->jointree; if (joinTree == NULL) { /* no join tree (mostly here to be defensive) */ @@ -174,7 +171,8 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) if (IsA(reference, RangeTblRef)) { - RangeTblEntry *rtentry = rt_fetch(reference->rtindex, query->rtable); + RangeTblEntry *rtentry = rt_fetch(reference->rtindex, + planContext->query->rtable); if (rtentry->rtekind != RTE_RESULT) { /* e.g. SELECT f() FROM rel */ @@ -203,8 +201,8 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) #endif } - targetList = query->targetList; - if (list_length(query->targetList) != 1) + targetList = planContext->query->targetList; + if (list_length(planContext->query->targetList) != 1) { /* multiple target list items */ return NULL; @@ -288,7 +286,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) if (partitionParam->paramkind == PARAM_EXTERN) { /* Don't log a message, we should end up here again without a parameter */ - *hasExternParam = true; + DissuadePlannerFromUsingPlan(planContext->plan); return NULL; } } @@ -354,7 +352,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) if (walkerParamContext.paramKind == PARAM_EXTERN) { /* Don't log a message, we should end up here again without a parameter */ - *hasExternParam = true; + DissuadePlannerFromUsingPlan(planContext->plan); } else { @@ -367,7 +365,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) ereport(DEBUG1, (errmsg("pushing down the function call"))); queryString = makeStringInfo(); - pg_get_query_def(query, queryString); + pg_get_query_def(planContext->query, queryString); task = CitusMakeNode(Task); task->taskType = SELECT_TASK; @@ -378,7 +376,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) job = CitusMakeNode(Job); job->jobId = UniqueJobId(); - job->jobQuery = query; + job->jobQuery = planContext->query; job->taskList = list_make1(task); distributedPlan = CitusMakeNode(DistributedPlan); @@ -390,5 +388,5 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) /* worker will take care of any necessary locking, treat query as read-only */ distributedPlan->modLevel = ROW_MODIFY_READONLY; - return distributedPlan; + return FinalizePlan(planContext->plan, distributedPlan); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a31320d62..59b368350 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -156,7 +156,8 @@ static int CompareInsertValuesByShardId(const void *leftElement, static uint64 GetAnchorShardId(List *relationShardList); static List * TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, - bool *isMultiShardQuery); + bool *isMultiShardQuery, + Const *distributionKeyValue); static List * SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId); @@ -2043,31 +2044,12 @@ PlanRouterQuery(Query *originalQuery, */ if (fastPathRouterQuery) { - List *shardIntervalList = NIL; Const *distributionKeyValue = plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue; - if (distributionKeyValue) - { - Oid relationId = ExtractFirstDistributedTableId(originalQuery); - DistTableCacheEntry *cache = DistributedTableCacheEntry(relationId); - ShardInterval *shardInterval = - FindShardInterval(distributionKeyValue->constvalue, cache); - - shardIntervalList = list_make1(shardInterval); - - if (partitionValueConst != NULL) - { - /* set the outgoing partition column value if requested */ - *partitionValueConst = distributionKeyValue; - } - } - else - { - shardIntervalList = - TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst, - &isMultiShardQuery); - } + List *shardIntervalList = + TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst, + &isMultiShardQuery, distributionKeyValue); /* @@ -2289,15 +2271,27 @@ GetAnchorShardId(List *prunedShardIntervalListList) */ static List * TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, - bool *isMultiShardQuery) + bool *isMultiShardQuery, Const *distributionKeyValue) { - Const *queryPartitionValueConst = NULL; - Oid relationId = ExtractFirstDistributedTableId(query); + + if (distributionKeyValue) + { + DistTableCacheEntry *cache = DistributedTableCacheEntry(relationId); + ShardInterval *shardInterval = + FindShardInterval(distributionKeyValue->constvalue, cache); + + if (partitionValueConst != NULL) + { + /* set the outgoing partition column value if requested */ + *partitionValueConst = distributionKeyValue; + } + return list_make1(shardInterval); + } + Node *quals = query->jointree->quals; - int relationIndex = 1; - + Const *queryPartitionValueConst = NULL; List *prunedShardIntervalList = PruneShards(relationId, relationIndex, make_ands_implicit((Expr *) quals), &queryPartitionValueConst); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 8513b7d60..98d6f45a7 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -129,6 +129,33 @@ typedef struct RelationRowLock } RelationRowLock; +typedef struct DistributedPlanningContext +{ + /* The parsed query that is given to the planner. It is a slightly modified + * to work with the standard_planner */ + Query *query; + + /* A copy of the original parsed query that is given to the planner. This + * doesn't contain most of the changes that are made to parse. There's one + * that change that is made for non fast path router queries though, which + * is the assigning of RTE identities using AssignRTEIdentities. This is + * NULL for non distributed plans, since those don't need it. */ + Query *originalQuery; + + /* the cursor options given to the planner */ + int cursorOptions; + + /* the ParamListInfo that is given to the planner */ + ParamListInfo boundParams; + + /* Plan created either by standard_planner or by FastPathPlanner */ + PlannedStmt *plan; + + /* Our custom restriction context */ + PlannerRestrictionContext *plannerRestrictionContext; +} DistributedPlanningContext; + + extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); extern List * ExtractRangeTableEntryList(Query *query); @@ -150,5 +177,8 @@ extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern int GetRTEIdentity(RangeTblEntry *rte); extern int32 BlessRecordExpression(Expr *expr); +extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan); +extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan, + struct DistributedPlan *distributedPlan); #endif /* DISTRIBUTED_PLANNER_H */ diff --git a/src/include/distributed/function_call_delegation.h b/src/include/distributed/function_call_delegation.h index 5cb5e6058..865ac0ce1 100644 --- a/src/include/distributed/function_call_delegation.h +++ b/src/include/distributed/function_call_delegation.h @@ -10,10 +10,12 @@ #define FUNCTION_CALL_DELEGATION_H #include "postgres.h" + +#include "distributed/distributed_planner.h" #include "distributed/multi_physical_planner.h" -DistributedPlan * TryToDelegateFunctionCall(Query *query, bool *hasParam); +PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext); #endif /* FUNCTION_CALL_DELEGATION_H */