Refactor distributed_planner for better understandability

use-plancontext-more
Jelte Fennema 2019-12-27 13:47:49 +01:00
parent 5a1e752726
commit b4381df8be
1 changed files with 115 additions and 79 deletions

View File

@ -69,7 +69,7 @@ int PlannerLevel = 0;
static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool ListContainsDistributedTableRTE(List *rangeTableList);
static bool IsUpdateOrDelete(Query *query); static bool IsUpdateOrDelete(Query *query);
static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, static PlannedStmt * CreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery, Query *query, Query *originalQuery, Query *query,
ParamListInfo boundParams, ParamListInfo boundParams,
PlannerRestrictionContext * PlannerRestrictionContext *
@ -118,6 +118,80 @@ static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
static bool QueryIsNotSimpleSelect(Node *node); static bool QueryIsNotSimpleSelect(Node *node);
static bool UpdateReferenceTablesWithShard(Node *node, void *context); static bool UpdateReferenceTablesWithShard(Node *node, void *context);
static PlannedStmt *
FastPastDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams,
Const *distributionKeyValue,
PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery)
{
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery =
true;
plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue =
distributionKeyValue;
PlannedStmt *fastPathPlan = FastPathPlanner(originalQuery, parse, boundParams);
return CreateDistributedPlannedStmt(fastPathPlan, originalQuery, parse,
boundParams, plannerRestrictionContext);
}
static PlannedStmt *
NonFastPathDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams,
List *rangeTableList,
PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery, int rteIdCounter)
{
/*
* Call into standard_planner because the Citus planner relies on both the
* restriction information per table and parse tree transformations made by
* postgres' planner.
*/
PlannedStmt *standardPlan = standard_planner(parse, cursorOptions, boundParams);
/* may've inlined new relation rtes */
rangeTableList = ExtractRangeTableEntryList(parse);
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
PlannedStmt *result = CreateDistributedPlannedStmt(standardPlan, originalQuery, parse,
boundParams,
plannerRestrictionContext);
bool setPartitionedTablesInherited = true;
AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited);
return result;
}
static PlannedStmt *
NonDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams)
{
PlannedStmt *result = standard_planner(parse, cursorOptions, boundParams);
bool hasExternParam = false;
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse,
&hasExternParam);
if (delegatePlan != NULL)
{
result = FinalizePlan(result, delegatePlan);
}
else if (hasExternParam)
{
/*
* As in CreateDistributedPlannedStmt, try dissuade planner when planning
* potentially failed due to unresolved prepared statement parameters.
*/
result->planTree->total_cost = FLT_MAX / 100000000;
}
return result;
}
/* Distributed planner hook */ /* Distributed planner hook */
PlannedStmt * PlannedStmt *
distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
@ -160,7 +234,15 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
} }
} }
if (needsDistributedPlanning) if (fastPathRouterQuery)
{
/*
* We need to copy the parse tree because the FastPathPlanner modifies
* it.
*/
originalQuery = copyObject(parse);
}
else if (needsDistributedPlanning)
{ {
/* /*
* Inserting into a local table needs to go through the regular postgres * Inserting into a local table needs to go through the regular postgres
@ -168,7 +250,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* don't have a way of doing both things and therefore error out, but do * don't have a way of doing both things and therefore error out, but do
* have a handy tip for users. * have a handy tip for users.
*/ */
if (!fastPathRouterQuery && InsertSelectIntoLocalTable(parse)) if (InsertSelectIntoLocalTable(parse))
{ {
ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a " ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a "
"local table"), "local table"),
@ -187,8 +269,6 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* Since fast-path queries do not through standard planner, we skip unnecessary * Since fast-path queries do not through standard planner, we skip unnecessary
* parts in that case. * parts in that case.
*/ */
if (!fastPathRouterQuery)
{
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
originalQuery = copyObject(parse); originalQuery = copyObject(parse);
@ -196,15 +276,6 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
AdjustPartitioningForDistributedPlanning(rangeTableList, AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited); setPartitionedTablesInherited);
} }
else
{
/*
* We still need to copy the parse tree because the FastPathPlanner
* modifies it.
*/
originalQuery = copyObject(parse);
}
}
/* /*
* Make sure that we hide shard names on the Citus MX worker nodes. See comments in * Make sure that we hide shard names on the Citus MX worker nodes. See comments in
@ -216,16 +287,17 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
PlannerRestrictionContext *plannerRestrictionContext = PlannerRestrictionContext *plannerRestrictionContext =
CreateAndPushPlannerRestrictionContext(); CreateAndPushPlannerRestrictionContext();
PG_TRY();
{
/* /*
* We keep track of how many times we've recursed into the planner, primarily * We keep track of how many times we've recursed into the planner, primarily
* to detect whether we are in a function call. We need to make sure that the * to detect whether we are in a function call. We need to make sure that the
* PlannerLevel is decremented exactly once at the end of this PG_TRY block, * PlannerLevel is decremented exactly once at the end of the next PG_TRY
* both in the happy case and when an error occurs. * block, both in the happy case and when an error occurs.
*/ */
PlannerLevel++; PlannerLevel++;
PG_TRY();
{
/* /*
* For trivial queries, we're skipping the standard_planner() in * For trivial queries, we're skipping the standard_planner() in
* order to eliminate its overhead. * order to eliminate its overhead.
@ -234,63 +306,25 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* planner relies on both the restriction information per table and parse tree * planner relies on both the restriction information per table and parse tree
* transformations made by postgres' planner. * transformations made by postgres' planner.
*/ */
if (fastPathRouterQuery)
if (needsDistributedPlanning && fastPathRouterQuery)
{ {
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery = result = FastPastDistributedStmt(parse, cursorOptions, boundParams,
true; distributionKeyValue,
plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue = plannerRestrictionContext,
distributionKeyValue; originalQuery);
}
result = FastPathPlanner(originalQuery, parse, boundParams); else if (needsDistributedPlanning)
{
result = NonFastPathDistributedStmt(parse, cursorOptions, boundParams,
rangeTableList,
plannerRestrictionContext, originalQuery,
rteIdCounter);
} }
else else
{ {
result = standard_planner(parse, cursorOptions, boundParams); result = NonDistributedStmt(parse, cursorOptions, boundParams);
if (needsDistributedPlanning)
{
/* may've inlined new relation rtes */
rangeTableList = ExtractRangeTableEntryList(parse);
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
} }
} }
if (needsDistributedPlanning)
{
uint64 planId = NextPlanId++;
result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse,
boundParams, plannerRestrictionContext);
if (!fastPathRouterQuery)
{
setPartitionedTablesInherited = true;
AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited);
}
}
else
{
bool hasExternParam = false;
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse,
&hasExternParam);
if (delegatePlan != NULL)
{
result = FinalizePlan(result, delegatePlan);
}
else if (hasExternParam)
{
/*
* As in CreateDistributedPlannedStmt, try dissuade planner when planning
* potentially failed due to unresolved prepared statement parameters.
*/
result->planTree->total_cost = FLT_MAX / 100000000;
}
}
PlannerLevel--;
}
PG_CATCH(); PG_CATCH();
{ {
PopPlannerRestrictionContext(); PopPlannerRestrictionContext();
@ -301,6 +335,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
} }
PG_END_TRY(); PG_END_TRY();
PlannerLevel--;
/* remove the context from the context list */ /* remove the context from the context list */
PopPlannerRestrictionContext(); PopPlannerRestrictionContext();
@ -583,10 +618,11 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan)
* query into a distributed plan that is encapsulated by a PlannedStmt. * query into a distributed plan that is encapsulated by a PlannedStmt.
*/ */
static PlannedStmt * static PlannedStmt *
CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, CreateDistributedPlannedStmt(PlannedStmt *localPlan, Query *originalQuery,
Query *query, ParamListInfo boundParams, Query *query, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
uint64 planId = NextPlanId++;
bool hasUnresolvedParams = false; bool hasUnresolvedParams = false;
JoinRestrictionContext *joinRestrictionContext = JoinRestrictionContext *joinRestrictionContext =
plannerRestrictionContext->joinRestrictionContext; plannerRestrictionContext->joinRestrictionContext;