diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 87d309f49..7042a2801 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -66,14 +66,20 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; +typedef struct DistributedStmtOptions +{ + Query *parse; + int cursorOptions; + ParamListInfo boundParams; + Query *originalQuery; + PlannerRestrictionContext *plannerRestrictionContext; +} DistributedStmtOptions; + static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool IsUpdateOrDelete(Query *query); static PlannedStmt * CreateDistributedPlannedStmt(PlannedStmt *localPlan, - Query *originalQuery, Query *query, - ParamListInfo boundParams, - PlannerRestrictionContext * - plannerRestrictionContext); + DistributedStmtOptions *opts); static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo boundParams, bool hasUnresolvedParams, @@ -118,46 +124,40 @@ static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); static bool QueryIsNotSimpleSelect(Node *node); static bool UpdateReferenceTablesWithShard(Node *node, void *context); - static PlannedStmt * -FastPastDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams, - Const *distributionKeyValue, - PlannerRestrictionContext *plannerRestrictionContext, - Query *originalQuery) +FastPastDistributedStmt(DistributedStmtOptions *opts, Const *distributionKeyValue) { - plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery = + opts->plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery = true; - plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue = + opts->plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue = distributionKeyValue; - PlannedStmt *fastPathPlan = FastPathPlanner(originalQuery, parse, boundParams); + PlannedStmt *fastPathPlan = FastPathPlanner(opts->originalQuery, opts->parse, + opts->boundParams); - return CreateDistributedPlannedStmt(fastPathPlan, originalQuery, parse, - boundParams, plannerRestrictionContext); + return CreateDistributedPlannedStmt(fastPathPlan, opts); } static PlannedStmt * -NonFastPathDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams, +NonFastPathDistributedStmt(DistributedStmtOptions *opts, List *rangeTableList, - PlannerRestrictionContext *plannerRestrictionContext, - Query *originalQuery, int rteIdCounter) + int rteIdCounter) { /* * Call into standard_planner because the Citus planner relies on both the * restriction information per table and parse tree transformations made by * postgres' planner. */ - PlannedStmt *standardPlan = standard_planner(parse, cursorOptions, boundParams); + PlannedStmt *standardPlan = standard_planner(opts->parse, opts->cursorOptions, + opts->boundParams); /* may've inlined new relation rtes */ - rangeTableList = ExtractRangeTableEntryList(parse); + rangeTableList = ExtractRangeTableEntryList(opts->parse); rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); - PlannedStmt *result = CreateDistributedPlannedStmt(standardPlan, originalQuery, parse, - boundParams, - plannerRestrictionContext); + PlannedStmt *result = CreateDistributedPlannedStmt(standardPlan, opts); bool setPartitionedTablesInherited = true; AdjustPartitioningForDistributedPlanning(rangeTableList, @@ -168,12 +168,13 @@ NonFastPathDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundP static PlannedStmt * -NonDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams) +NonDistributedStmt(DistributedStmtOptions *opts) { - PlannedStmt *result = standard_planner(parse, cursorOptions, boundParams); + PlannedStmt *result = standard_planner(opts->parse, opts->cursorOptions, + opts->boundParams); bool hasExternParam = false; - DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse, + DistributedPlan *delegatePlan = TryToDelegateFunctionCall(opts->parse, &hasExternParam); if (delegatePlan != NULL) { @@ -198,12 +199,16 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { PlannedStmt *result = NULL; bool needsDistributedPlanning = false; - Query *originalQuery = NULL; bool setPartitionedTablesInherited = false; List *rangeTableList = ExtractRangeTableEntryList(parse); int rteIdCounter = 1; bool fastPathRouterQuery = false; Const *distributionKeyValue = NULL; + DistributedStmtOptions opts = { + .parse = parse, + .cursorOptions = cursorOptions, + .boundParams = boundParams, + }; if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) { @@ -240,7 +245,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * We need to copy the parse tree because the FastPathPlanner modifies * it. */ - originalQuery = copyObject(parse); + opts.originalQuery = copyObject(parse); } else if (needsDistributedPlanning) { @@ -270,7 +275,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * parts in that case. */ rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); - originalQuery = copyObject(parse); + opts.originalQuery = copyObject(parse); setPartitionedTablesInherited = false; AdjustPartitioningForDistributedPlanning(rangeTableList, @@ -284,8 +289,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) ReplaceTableVisibleFunction((Node *) parse); /* create a restriction context and put it at the end if context list */ - PlannerRestrictionContext *plannerRestrictionContext = - CreateAndPushPlannerRestrictionContext(); + opts.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); /* * We keep track of how many times we've recursed into the planner, primarily @@ -308,21 +312,15 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) */ if (fastPathRouterQuery) { - result = FastPastDistributedStmt(parse, cursorOptions, boundParams, - distributionKeyValue, - plannerRestrictionContext, - originalQuery); + result = FastPastDistributedStmt(&opts, distributionKeyValue); } else if (needsDistributedPlanning) { - result = NonFastPathDistributedStmt(parse, cursorOptions, boundParams, - rangeTableList, - plannerRestrictionContext, originalQuery, - rteIdCounter); + result = NonFastPathDistributedStmt(&opts, rangeTableList, rteIdCounter); } else { - result = NonDistributedStmt(parse, cursorOptions, boundParams); + result = NonDistributedStmt(&opts); } } PG_CATCH(); @@ -618,26 +616,25 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan) * query into a distributed plan that is encapsulated by a PlannedStmt. */ static PlannedStmt * -CreateDistributedPlannedStmt(PlannedStmt *localPlan, Query *originalQuery, - Query *query, ParamListInfo boundParams, - PlannerRestrictionContext *plannerRestrictionContext) +CreateDistributedPlannedStmt(PlannedStmt *localPlan, DistributedStmtOptions *opts) { uint64 planId = NextPlanId++; bool hasUnresolvedParams = false; JoinRestrictionContext *joinRestrictionContext = - plannerRestrictionContext->joinRestrictionContext; + opts->plannerRestrictionContext->joinRestrictionContext; - if (HasUnresolvedExternParamsWalker((Node *) originalQuery, boundParams)) + if (HasUnresolvedExternParamsWalker((Node *) opts->originalQuery, opts->boundParams)) { hasUnresolvedParams = true; } - plannerRestrictionContext->joinRestrictionContext = + opts->plannerRestrictionContext->joinRestrictionContext = RemoveDuplicateJoinRestrictions(joinRestrictionContext); DistributedPlan *distributedPlan = - CreateDistributedPlan(planId, originalQuery, query, boundParams, - hasUnresolvedParams, plannerRestrictionContext); + CreateDistributedPlan(planId, opts->originalQuery, opts->parse, + opts->boundParams, + hasUnresolvedParams, opts->plannerRestrictionContext); /* * If no plan was generated, prepare a generic error to be emitted. @@ -690,7 +687,7 @@ CreateDistributedPlannedStmt(PlannedStmt *localPlan, Query *originalQuery, * if it is planned as a multi shard modify query. */ if ((distributedPlan->planningError || - (IsUpdateOrDelete(originalQuery) && IsMultiTaskPlan(distributedPlan))) && + (IsUpdateOrDelete(opts->originalQuery) && IsMultiTaskPlan(distributedPlan))) && hasUnresolvedParams) { /*