From b4381df8bed1f499e932982d46128bcebcc16e2b Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 27 Dec 2019 13:47:49 +0100 Subject: [PATCH] Refactor distributed_planner for better understandability --- .../distributed/planner/distributed_planner.c | 194 +++++++++++------- 1 file changed, 115 insertions(+), 79 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 7d3752e96..87d309f49 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -69,7 +69,7 @@ int PlannerLevel = 0; static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool IsUpdateOrDelete(Query *query); -static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, +static PlannedStmt * CreateDistributedPlannedStmt(PlannedStmt *localPlan, Query *originalQuery, Query *query, ParamListInfo boundParams, PlannerRestrictionContext * @@ -118,6 +118,80 @@ static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); static bool QueryIsNotSimpleSelect(Node *node); static bool UpdateReferenceTablesWithShard(Node *node, void *context); + +static PlannedStmt * +FastPastDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams, + Const *distributionKeyValue, + PlannerRestrictionContext *plannerRestrictionContext, + Query *originalQuery) +{ + plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery = + true; + plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue = + distributionKeyValue; + + PlannedStmt *fastPathPlan = FastPathPlanner(originalQuery, parse, boundParams); + + return CreateDistributedPlannedStmt(fastPathPlan, originalQuery, parse, + boundParams, plannerRestrictionContext); +} + + +static PlannedStmt * +NonFastPathDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams, + List *rangeTableList, + PlannerRestrictionContext *plannerRestrictionContext, + Query *originalQuery, int rteIdCounter) +{ + /* + * Call into standard_planner because the Citus planner relies on both the + * restriction information per table and parse tree transformations made by + * postgres' planner. + */ + PlannedStmt *standardPlan = standard_planner(parse, cursorOptions, boundParams); + + /* may've inlined new relation rtes */ + rangeTableList = ExtractRangeTableEntryList(parse); + rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); + + + PlannedStmt *result = CreateDistributedPlannedStmt(standardPlan, originalQuery, parse, + boundParams, + plannerRestrictionContext); + + bool setPartitionedTablesInherited = true; + AdjustPartitioningForDistributedPlanning(rangeTableList, + setPartitionedTablesInherited); + + return result; +} + + +static PlannedStmt * +NonDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams) +{ + PlannedStmt *result = standard_planner(parse, cursorOptions, boundParams); + + bool hasExternParam = false; + DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse, + &hasExternParam); + if (delegatePlan != NULL) + { + result = FinalizePlan(result, delegatePlan); + } + else if (hasExternParam) + { + /* + * As in CreateDistributedPlannedStmt, try dissuade planner when planning + * potentially failed due to unresolved prepared statement parameters. + */ + result->planTree->total_cost = FLT_MAX / 100000000; + } + + return result; +} + + /* Distributed planner hook */ PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) @@ -160,7 +234,15 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } } - if (needsDistributedPlanning) + if (fastPathRouterQuery) + { + /* + * We need to copy the parse tree because the FastPathPlanner modifies + * it. + */ + originalQuery = copyObject(parse); + } + else if (needsDistributedPlanning) { /* * Inserting into a local table needs to go through the regular postgres @@ -168,7 +250,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * don't have a way of doing both things and therefore error out, but do * have a handy tip for users. */ - if (!fastPathRouterQuery && InsertSelectIntoLocalTable(parse)) + if (InsertSelectIntoLocalTable(parse)) { ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a " "local table"), @@ -187,23 +269,12 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * Since fast-path queries do not through standard planner, we skip unnecessary * parts in that case. */ - if (!fastPathRouterQuery) - { - rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); - originalQuery = copyObject(parse); + rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); + originalQuery = copyObject(parse); - setPartitionedTablesInherited = false; - AdjustPartitioningForDistributedPlanning(rangeTableList, - setPartitionedTablesInherited); - } - else - { - /* - * We still need to copy the parse tree because the FastPathPlanner - * modifies it. - */ - originalQuery = copyObject(parse); - } + setPartitionedTablesInherited = false; + AdjustPartitioningForDistributedPlanning(rangeTableList, + setPartitionedTablesInherited); } /* @@ -216,16 +287,17 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) PlannerRestrictionContext *plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); + /* + * We keep track of how many times we've recursed into the planner, primarily + * to detect whether we are in a function call. We need to make sure that the + * PlannerLevel is decremented exactly once at the end of the next PG_TRY + * block, both in the happy case and when an error occurs. + */ + PlannerLevel++; + + PG_TRY(); { - /* - * We keep track of how many times we've recursed into the planner, primarily - * to detect whether we are in a function call. We need to make sure that the - * PlannerLevel is decremented exactly once at the end of this PG_TRY block, - * both in the happy case and when an error occurs. - */ - PlannerLevel++; - /* * For trivial queries, we're skipping the standard_planner() in * order to eliminate its overhead. @@ -234,62 +306,24 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * planner relies on both the restriction information per table and parse tree * transformations made by postgres' planner. */ - - if (needsDistributedPlanning && fastPathRouterQuery) + if (fastPathRouterQuery) { - plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery = - true; - plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue = - distributionKeyValue; - - result = FastPathPlanner(originalQuery, parse, boundParams); + result = FastPastDistributedStmt(parse, cursorOptions, boundParams, + distributionKeyValue, + plannerRestrictionContext, + originalQuery); + } + else if (needsDistributedPlanning) + { + result = NonFastPathDistributedStmt(parse, cursorOptions, boundParams, + rangeTableList, + plannerRestrictionContext, originalQuery, + rteIdCounter); } else { - result = standard_planner(parse, cursorOptions, boundParams); - - if (needsDistributedPlanning) - { - /* may've inlined new relation rtes */ - rangeTableList = ExtractRangeTableEntryList(parse); - rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); - } + result = NonDistributedStmt(parse, cursorOptions, boundParams); } - - if (needsDistributedPlanning) - { - uint64 planId = NextPlanId++; - - result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse, - boundParams, plannerRestrictionContext); - - if (!fastPathRouterQuery) - { - setPartitionedTablesInherited = true; - AdjustPartitioningForDistributedPlanning(rangeTableList, - setPartitionedTablesInherited); - } - } - else - { - bool hasExternParam = false; - DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse, - &hasExternParam); - if (delegatePlan != NULL) - { - result = FinalizePlan(result, delegatePlan); - } - else if (hasExternParam) - { - /* - * As in CreateDistributedPlannedStmt, try dissuade planner when planning - * potentially failed due to unresolved prepared statement parameters. - */ - result->planTree->total_cost = FLT_MAX / 100000000; - } - } - - PlannerLevel--; } PG_CATCH(); { @@ -301,6 +335,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } PG_END_TRY(); + PlannerLevel--; /* remove the context from the context list */ PopPlannerRestrictionContext(); @@ -583,10 +618,11 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan) * query into a distributed plan that is encapsulated by a PlannedStmt. */ static PlannedStmt * -CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, +CreateDistributedPlannedStmt(PlannedStmt *localPlan, Query *originalQuery, Query *query, ParamListInfo boundParams, PlannerRestrictionContext *plannerRestrictionContext) { + uint64 planId = NextPlanId++; bool hasUnresolvedParams = false; JoinRestrictionContext *joinRestrictionContext = plannerRestrictionContext->joinRestrictionContext;