diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 91a3e40e8..227b99c18 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -66,34 +66,10 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; -typedef struct DistributedPlanningContext -{ - /* The parsed query that is given to the planner. It is a slightly modified - * to work with the standard_planner */ - Query *parse; - - /* A copy of the original parsed query that is given to the planner. This - * doesn't contain most of the changes that are made to parse. There's one - * that change that is made for non fast path router queries though, which - * is the assigning of RTE identities using AssignRTEIdentities. This is - * NULL for non distributed plans, since those don't need it. */ - Query *originalQuery; - - /* the cursor options given to the planner */ - int cursorOptions; - - /* the ParamListInfo that is given to the planner */ - ParamListInfo boundParams; - - /* Our custom restriction context */ - PlannerRestrictionContext *plannerRestrictionContext; -} DistributedPlanningContext; - - static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool IsUpdateOrDelete(Query *query); -static PlannedStmt * CreateDistributedPlannedStmt(PlannedStmt *localPlan, - DistributedPlanningContext *ctx); +static PlannedStmt * CreateDistributedPlannedStmt( + DistributedPlanningContext *planContext); static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo boundParams, bool hasUnresolvedParams, @@ -108,8 +84,6 @@ static int AssignRTEIdentities(List *rangeTableList, int rteIdCounter); static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier); static void AdjustPartitioningForDistributedPlanning(List *rangeTableList, bool setPartitionedTablesInherited); -static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, - DistributedPlan *distributedPlan); static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan, CustomScan *customScan); @@ -137,14 +111,10 @@ static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boun static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); static bool QueryIsNotSimpleSelect(Node *node); static bool UpdateReferenceTablesWithShard(Node *node, void *context); -static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *ctx, +static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, Const *distributionKeyValue); -static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *ctx, +static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, List *rangeTableList, int rteIdCounter); -static PlannedStmt * PlanDelegatedFunctionCall(DistributedPlanningContext *ctx, - DistributedPlan *delegatedPlan); -static PlannedStmt * PlanNonDistributedStmt(DistributedPlanningContext *ctx, - bool hasExternParam); /* Distributed planner hook */ PlannedStmt * @@ -157,7 +127,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) int rteIdCounter = 1; bool fastPathRouterQuery = false; Const *distributionKeyValue = NULL; - DistributedPlanningContext ctx = { + DistributedPlanningContext planContext = { .parse = parse, .cursorOptions = cursorOptions, .boundParams = boundParams, @@ -200,7 +170,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * too, but for those it needs to be done AFTER calling * AssignRTEIdentities. */ - ctx.originalQuery = copyObject(parse); + planContext.originalQuery = copyObject(parse); } else if (needsDistributedPlanning) { @@ -226,7 +196,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * to be present in the copied query too. */ rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); - ctx.originalQuery = copyObject(parse); + planContext.originalQuery = copyObject(parse); setPartitionedTablesInherited = false; AdjustPartitioningForDistributedPlanning(rangeTableList, @@ -240,7 +210,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) ReplaceTableVisibleFunction((Node *) parse); /* create a restriction context and put it at the end if context list */ - ctx.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); + planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); /* * We keep track of how many times we've recursed into the planner, primarily @@ -253,25 +223,28 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) PG_TRY(); { - DistributedPlan *delegatedPlan = NULL; - bool hasExternParam = false; - if (fastPathRouterQuery) { - result = PlanFastPathDistributedStmt(&ctx, distributionKeyValue); - } - else if (needsDistributedPlanning) - { - result = PlanDistributedStmt(&ctx, rangeTableList, rteIdCounter); - } - else if ((delegatedPlan = TryToDelegateFunctionCall(ctx.parse, - &hasExternParam))) - { - result = PlanDelegatedFunctionCall(&ctx, delegatedPlan); + result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue); } else { - result = PlanNonDistributedStmt(&ctx, hasExternParam); + /* + * Call into standard_planner because the Citus planner relies on both the + * restriction information per table and parse tree transformations made by + * postgres' planner. + */ + planContext.plan = standard_planner(planContext.parse, + planContext.cursorOptions, + planContext.boundParams); + if (needsDistributedPlanning) + { + result = PlanDistributedStmt(&planContext, rangeTableList, rteIdCounter); + } + else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL) + { + result = planContext.plan; + } } } PG_CATCH(); @@ -567,17 +540,18 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan) * the FastPathPlanner. */ static PlannedStmt * -PlanFastPathDistributedStmt(DistributedPlanningContext *ctx, Const *distributionKeyValue) +PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, + Const *distributionKeyValue) { - ctx->plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery = - true; - ctx->plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue = - distributionKeyValue; + planContext->plannerRestrictionContext->fastPathRestrictionContext-> + fastPathRouterQuery = true; + planContext->plannerRestrictionContext->fastPathRestrictionContext-> + distributionKeyValue = distributionKeyValue; - PlannedStmt *fastPathPlan = FastPathPlanner(ctx->originalQuery, ctx->parse, - ctx->boundParams); + planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->parse, + planContext->boundParams); - return CreateDistributedPlannedStmt(fastPathPlan, ctx); + return CreateDistributedPlannedStmt(planContext); } @@ -586,24 +560,16 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *ctx, Const *distribution * planner. */ static PlannedStmt * -PlanDistributedStmt(DistributedPlanningContext *ctx, +PlanDistributedStmt(DistributedPlanningContext *planContext, List *rangeTableList, int rteIdCounter) { - /* - * Call into standard_planner because the Citus planner relies on both the - * restriction information per table and parse tree transformations made by - * postgres' planner. - */ - PlannedStmt *standardPlan = standard_planner(ctx->parse, ctx->cursorOptions, - ctx->boundParams); - /* may've inlined new relation rtes */ - rangeTableList = ExtractRangeTableEntryList(ctx->parse); + rangeTableList = ExtractRangeTableEntryList(planContext->parse); rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); - PlannedStmt *result = CreateDistributedPlannedStmt(standardPlan, ctx); + PlannedStmt *result = CreateDistributedPlannedStmt(planContext); bool setPartitionedTablesInherited = true; AdjustPartitioningForDistributedPlanning(rangeTableList, @@ -614,41 +580,17 @@ PlanDistributedStmt(DistributedPlanningContext *ctx, /* - * PlanDelegatedFunctionCall creates a plan by delagating the function call to - * the worker. + * DissuadePlannerFromUsingPlan try dissuade planner when planning a plan that + * potentially failed due to unresolved prepared statement parameters. */ -static PlannedStmt * -PlanDelegatedFunctionCall(DistributedPlanningContext *ctx, DistributedPlan *delegatedPlan) +void +DissuadePlannerFromUsingPlan(PlannedStmt *plan) { - PlannedStmt *result = standard_planner(ctx->parse, ctx->cursorOptions, - ctx->boundParams); - - result = FinalizePlan(result, delegatedPlan); - - return result; -} - - -/* - * PlanNonDistributedStmt creates a normal (non-distributed) planned statement - * using the PG planner. - */ -static PlannedStmt * -PlanNonDistributedStmt(DistributedPlanningContext *ctx, bool hasExternParam) -{ - PlannedStmt *result = standard_planner(ctx->parse, ctx->cursorOptions, - ctx->boundParams); - - if (hasExternParam) - { - /* - * As in CreateDistributedPlannedStmt, try dissuade planner when planning - * potentially failed due to unresolved prepared statement parameters. - */ - result->planTree->total_cost = FLT_MAX / 100000000; - } - - return result; + /* + * Arbitrarily high cost, but low enough that it can be added up + * without overflowing by choose_custom_plan(). + */ + plan->planTree->total_cost = FLT_MAX / 100000000; } @@ -657,25 +599,27 @@ PlanNonDistributedStmt(DistributedPlanningContext *ctx, bool hasExternParam) * query into a distributed plan that is encapsulated by a PlannedStmt. */ static PlannedStmt * -CreateDistributedPlannedStmt(PlannedStmt *localPlan, DistributedPlanningContext *ctx) +CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) { uint64 planId = NextPlanId++; bool hasUnresolvedParams = false; JoinRestrictionContext *joinRestrictionContext = - ctx->plannerRestrictionContext->joinRestrictionContext; + planContext->plannerRestrictionContext->joinRestrictionContext; - if (HasUnresolvedExternParamsWalker((Node *) ctx->originalQuery, ctx->boundParams)) + if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery, + planContext->boundParams)) { hasUnresolvedParams = true; } - ctx->plannerRestrictionContext->joinRestrictionContext = + planContext->plannerRestrictionContext->joinRestrictionContext = RemoveDuplicateJoinRestrictions(joinRestrictionContext); DistributedPlan *distributedPlan = - CreateDistributedPlan(planId, ctx->originalQuery, ctx->parse, - ctx->boundParams, - hasUnresolvedParams, ctx->plannerRestrictionContext); + CreateDistributedPlan(planId, planContext->originalQuery, planContext->parse, + planContext->boundParams, + hasUnresolvedParams, + planContext->plannerRestrictionContext); /* * If no plan was generated, prepare a generic error to be emitted. @@ -720,7 +664,7 @@ CreateDistributedPlannedStmt(PlannedStmt *localPlan, DistributedPlanningContext distributedPlan->planId = planId; /* create final plan by combining local plan with distributed plan */ - PlannedStmt *resultPlan = FinalizePlan(localPlan, distributedPlan); + PlannedStmt *resultPlan = FinalizePlan(planContext->plan, distributedPlan); /* * As explained above, force planning costs to be unrealistically high if @@ -728,14 +672,11 @@ CreateDistributedPlannedStmt(PlannedStmt *localPlan, DistributedPlanningContext * if it is planned as a multi shard modify query. */ if ((distributedPlan->planningError || - (IsUpdateOrDelete(ctx->originalQuery) && IsMultiTaskPlan(distributedPlan))) && + (IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan( + distributedPlan))) && hasUnresolvedParams) { - /* - * Arbitrarily high cost, but low enough that it can be added up - * without overflowing by choose_custom_plan(). - */ - resultPlan->planTree->total_cost = FLT_MAX / 100000000; + DissuadePlannerFromUsingPlan(resultPlan); } return resultPlan; @@ -1173,7 +1114,7 @@ GetDistributedPlan(CustomScan *customScan) * FinalizePlan combines local plan with distributed plan and creates a plan * which can be run by the PostgreSQL executor. */ -static PlannedStmt * +PlannedStmt * FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) { PlannedStmt *finalPlan = NULL; diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 8201abb32..a8600c5ec 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -95,8 +95,8 @@ contain_param_walker(Node *node, void *context) * forms involving multiple function calls, FROM clauses, WHERE clauses, * ... Those complex forms are handled in the coordinator. */ -DistributedPlan * -TryToDelegateFunctionCall(Query *query, bool *hasExternParam) +PlannedStmt * +TryToDelegateFunctionCall(DistributedPlanningContext *planContext) { List *targetList = NIL; TargetEntry *targetEntry = NULL; @@ -114,12 +114,9 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) StringInfo queryString = NULL; Task *task = NULL; Job *job = NULL; - DistributedPlan *distributedPlan = NULL; + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); struct ParamWalkerContext walkerParamContext = { 0 }; - /* set hasExternParam now in case of early exit */ - *hasExternParam = false; - if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4)) { /* Citus is not ready to determine whether function is distributed */ @@ -133,19 +130,19 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) return NULL; } - if (query == NULL) + if (planContext->parse == NULL) { /* no query (mostly here to be defensive) */ return NULL; } - if (query->commandType != CMD_SELECT) + if (planContext->parse->commandType != CMD_SELECT) { /* not a SELECT */ return NULL; } - FromExpr *joinTree = query->jointree; + FromExpr *joinTree = planContext->parse->jointree; if (joinTree == NULL) { /* no join tree (mostly here to be defensive) */ @@ -174,7 +171,8 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) if (IsA(reference, RangeTblRef)) { - RangeTblEntry *rtentry = rt_fetch(reference->rtindex, query->rtable); + RangeTblEntry *rtentry = rt_fetch(reference->rtindex, + planContext->parse->rtable); if (rtentry->rtekind != RTE_RESULT) { /* e.g. SELECT f() FROM rel */ @@ -203,8 +201,8 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) #endif } - targetList = query->targetList; - if (list_length(query->targetList) != 1) + targetList = planContext->parse->targetList; + if (list_length(planContext->parse->targetList) != 1) { /* multiple target list items */ return NULL; @@ -288,7 +286,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) if (partitionParam->paramkind == PARAM_EXTERN) { /* Don't log a message, we should end up here again without a parameter */ - *hasExternParam = true; + DissuadePlannerFromUsingPlan(planContext->plan); return NULL; } } @@ -354,7 +352,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) if (walkerParamContext.paramKind == PARAM_EXTERN) { /* Don't log a message, we should end up here again without a parameter */ - *hasExternParam = true; + DissuadePlannerFromUsingPlan(planContext->plan); } else { @@ -367,7 +365,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) ereport(DEBUG1, (errmsg("pushing down the function call"))); queryString = makeStringInfo(); - pg_get_query_def(query, queryString); + pg_get_query_def(planContext->parse, queryString); task = CitusMakeNode(Task); task->taskType = SELECT_TASK; @@ -378,7 +376,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) job = CitusMakeNode(Job); job->jobId = UniqueJobId(); - job->jobQuery = query; + job->jobQuery = planContext->parse; job->taskList = list_make1(task); distributedPlan = CitusMakeNode(DistributedPlan); @@ -390,5 +388,5 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) /* worker will take care of any necessary locking, treat query as read-only */ distributedPlan->modLevel = ROW_MODIFY_READONLY; - return distributedPlan; + return FinalizePlan(planContext->plan, distributedPlan); } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 8513b7d60..0af62f2a2 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -129,6 +129,33 @@ typedef struct RelationRowLock } RelationRowLock; +typedef struct DistributedPlanningContext +{ + /* The parsed query that is given to the planner. It is a slightly modified + * to work with the standard_planner */ + Query *parse; + + /* A copy of the original parsed query that is given to the planner. This + * doesn't contain most of the changes that are made to parse. There's one + * that change that is made for non fast path router queries though, which + * is the assigning of RTE identities using AssignRTEIdentities. This is + * NULL for non distributed plans, since those don't need it. */ + Query *originalQuery; + + /* the cursor options given to the planner */ + int cursorOptions; + + /* the ParamListInfo that is given to the planner */ + ParamListInfo boundParams; + + /* Plan created either by standard_planner or by FastPathPlanner */ + PlannedStmt *plan; + + /* Our custom restriction context */ + PlannerRestrictionContext *plannerRestrictionContext; +} DistributedPlanningContext; + + extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); extern List * ExtractRangeTableEntryList(Query *query); @@ -150,5 +177,8 @@ extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern int GetRTEIdentity(RangeTblEntry *rte); extern int32 BlessRecordExpression(Expr *expr); +extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan); +extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan, + struct DistributedPlan *distributedPlan); #endif /* DISTRIBUTED_PLANNER_H */ diff --git a/src/include/distributed/function_call_delegation.h b/src/include/distributed/function_call_delegation.h index 5cb5e6058..865ac0ce1 100644 --- a/src/include/distributed/function_call_delegation.h +++ b/src/include/distributed/function_call_delegation.h @@ -10,10 +10,12 @@ #define FUNCTION_CALL_DELEGATION_H #include "postgres.h" + +#include "distributed/distributed_planner.h" #include "distributed/multi_physical_planner.h" -DistributedPlan * TryToDelegateFunctionCall(Query *query, bool *hasParam); +PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext); #endif /* FUNCTION_CALL_DELEGATION_H */