mirror of https://github.com/citusdata/citus.git
Pack shared arguments in a struct
parent
b4381df8be
commit
d2f31a2dc2
|
@ -66,14 +66,20 @@ static uint64 NextPlanId = 1;
|
|||
/* keep track of planner call stack levels */
|
||||
int PlannerLevel = 0;
|
||||
|
||||
typedef struct DistributedStmtOptions
|
||||
{
|
||||
Query *parse;
|
||||
int cursorOptions;
|
||||
ParamListInfo boundParams;
|
||||
Query *originalQuery;
|
||||
PlannerRestrictionContext *plannerRestrictionContext;
|
||||
} DistributedStmtOptions;
|
||||
|
||||
|
||||
static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
||||
static bool IsUpdateOrDelete(Query *query);
|
||||
static PlannedStmt * CreateDistributedPlannedStmt(PlannedStmt *localPlan,
|
||||
Query *originalQuery, Query *query,
|
||||
ParamListInfo boundParams,
|
||||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
DistributedStmtOptions *opts);
|
||||
static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
|
||||
Query *query, ParamListInfo boundParams,
|
||||
bool hasUnresolvedParams,
|
||||
|
@ -118,46 +124,40 @@ static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
|
|||
static bool QueryIsNotSimpleSelect(Node *node);
|
||||
static bool UpdateReferenceTablesWithShard(Node *node, void *context);
|
||||
|
||||
|
||||
static PlannedStmt *
|
||||
FastPastDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams,
|
||||
Const *distributionKeyValue,
|
||||
PlannerRestrictionContext *plannerRestrictionContext,
|
||||
Query *originalQuery)
|
||||
FastPastDistributedStmt(DistributedStmtOptions *opts, Const *distributionKeyValue)
|
||||
{
|
||||
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery =
|
||||
opts->plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery =
|
||||
true;
|
||||
plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue =
|
||||
opts->plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue =
|
||||
distributionKeyValue;
|
||||
|
||||
PlannedStmt *fastPathPlan = FastPathPlanner(originalQuery, parse, boundParams);
|
||||
PlannedStmt *fastPathPlan = FastPathPlanner(opts->originalQuery, opts->parse,
|
||||
opts->boundParams);
|
||||
|
||||
return CreateDistributedPlannedStmt(fastPathPlan, originalQuery, parse,
|
||||
boundParams, plannerRestrictionContext);
|
||||
return CreateDistributedPlannedStmt(fastPathPlan, opts);
|
||||
}
|
||||
|
||||
|
||||
static PlannedStmt *
|
||||
NonFastPathDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams,
|
||||
NonFastPathDistributedStmt(DistributedStmtOptions *opts,
|
||||
List *rangeTableList,
|
||||
PlannerRestrictionContext *plannerRestrictionContext,
|
||||
Query *originalQuery, int rteIdCounter)
|
||||
int rteIdCounter)
|
||||
{
|
||||
/*
|
||||
* Call into standard_planner because the Citus planner relies on both the
|
||||
* restriction information per table and parse tree transformations made by
|
||||
* postgres' planner.
|
||||
*/
|
||||
PlannedStmt *standardPlan = standard_planner(parse, cursorOptions, boundParams);
|
||||
PlannedStmt *standardPlan = standard_planner(opts->parse, opts->cursorOptions,
|
||||
opts->boundParams);
|
||||
|
||||
/* may've inlined new relation rtes */
|
||||
rangeTableList = ExtractRangeTableEntryList(parse);
|
||||
rangeTableList = ExtractRangeTableEntryList(opts->parse);
|
||||
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
||||
|
||||
|
||||
PlannedStmt *result = CreateDistributedPlannedStmt(standardPlan, originalQuery, parse,
|
||||
boundParams,
|
||||
plannerRestrictionContext);
|
||||
PlannedStmt *result = CreateDistributedPlannedStmt(standardPlan, opts);
|
||||
|
||||
bool setPartitionedTablesInherited = true;
|
||||
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||
|
@ -168,12 +168,13 @@ NonFastPathDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundP
|
|||
|
||||
|
||||
static PlannedStmt *
|
||||
NonDistributedStmt(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||
NonDistributedStmt(DistributedStmtOptions *opts)
|
||||
{
|
||||
PlannedStmt *result = standard_planner(parse, cursorOptions, boundParams);
|
||||
PlannedStmt *result = standard_planner(opts->parse, opts->cursorOptions,
|
||||
opts->boundParams);
|
||||
|
||||
bool hasExternParam = false;
|
||||
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse,
|
||||
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(opts->parse,
|
||||
&hasExternParam);
|
||||
if (delegatePlan != NULL)
|
||||
{
|
||||
|
@ -198,12 +199,16 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
{
|
||||
PlannedStmt *result = NULL;
|
||||
bool needsDistributedPlanning = false;
|
||||
Query *originalQuery = NULL;
|
||||
bool setPartitionedTablesInherited = false;
|
||||
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||
int rteIdCounter = 1;
|
||||
bool fastPathRouterQuery = false;
|
||||
Const *distributionKeyValue = NULL;
|
||||
DistributedStmtOptions opts = {
|
||||
.parse = parse,
|
||||
.cursorOptions = cursorOptions,
|
||||
.boundParams = boundParams,
|
||||
};
|
||||
|
||||
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
|
||||
{
|
||||
|
@ -240,7 +245,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
* We need to copy the parse tree because the FastPathPlanner modifies
|
||||
* it.
|
||||
*/
|
||||
originalQuery = copyObject(parse);
|
||||
opts.originalQuery = copyObject(parse);
|
||||
}
|
||||
else if (needsDistributedPlanning)
|
||||
{
|
||||
|
@ -270,7 +275,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
* parts in that case.
|
||||
*/
|
||||
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
||||
originalQuery = copyObject(parse);
|
||||
opts.originalQuery = copyObject(parse);
|
||||
|
||||
setPartitionedTablesInherited = false;
|
||||
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||
|
@ -284,8 +289,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
ReplaceTableVisibleFunction((Node *) parse);
|
||||
|
||||
/* create a restriction context and put it at the end if context list */
|
||||
PlannerRestrictionContext *plannerRestrictionContext =
|
||||
CreateAndPushPlannerRestrictionContext();
|
||||
opts.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
|
||||
|
||||
/*
|
||||
* We keep track of how many times we've recursed into the planner, primarily
|
||||
|
@ -308,21 +312,15 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
*/
|
||||
if (fastPathRouterQuery)
|
||||
{
|
||||
result = FastPastDistributedStmt(parse, cursorOptions, boundParams,
|
||||
distributionKeyValue,
|
||||
plannerRestrictionContext,
|
||||
originalQuery);
|
||||
result = FastPastDistributedStmt(&opts, distributionKeyValue);
|
||||
}
|
||||
else if (needsDistributedPlanning)
|
||||
{
|
||||
result = NonFastPathDistributedStmt(parse, cursorOptions, boundParams,
|
||||
rangeTableList,
|
||||
plannerRestrictionContext, originalQuery,
|
||||
rteIdCounter);
|
||||
result = NonFastPathDistributedStmt(&opts, rangeTableList, rteIdCounter);
|
||||
}
|
||||
else
|
||||
{
|
||||
result = NonDistributedStmt(parse, cursorOptions, boundParams);
|
||||
result = NonDistributedStmt(&opts);
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
|
@ -618,26 +616,25 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan)
|
|||
* query into a distributed plan that is encapsulated by a PlannedStmt.
|
||||
*/
|
||||
static PlannedStmt *
|
||||
CreateDistributedPlannedStmt(PlannedStmt *localPlan, Query *originalQuery,
|
||||
Query *query, ParamListInfo boundParams,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
CreateDistributedPlannedStmt(PlannedStmt *localPlan, DistributedStmtOptions *opts)
|
||||
{
|
||||
uint64 planId = NextPlanId++;
|
||||
bool hasUnresolvedParams = false;
|
||||
JoinRestrictionContext *joinRestrictionContext =
|
||||
plannerRestrictionContext->joinRestrictionContext;
|
||||
opts->plannerRestrictionContext->joinRestrictionContext;
|
||||
|
||||
if (HasUnresolvedExternParamsWalker((Node *) originalQuery, boundParams))
|
||||
if (HasUnresolvedExternParamsWalker((Node *) opts->originalQuery, opts->boundParams))
|
||||
{
|
||||
hasUnresolvedParams = true;
|
||||
}
|
||||
|
||||
plannerRestrictionContext->joinRestrictionContext =
|
||||
opts->plannerRestrictionContext->joinRestrictionContext =
|
||||
RemoveDuplicateJoinRestrictions(joinRestrictionContext);
|
||||
|
||||
DistributedPlan *distributedPlan =
|
||||
CreateDistributedPlan(planId, originalQuery, query, boundParams,
|
||||
hasUnresolvedParams, plannerRestrictionContext);
|
||||
CreateDistributedPlan(planId, opts->originalQuery, opts->parse,
|
||||
opts->boundParams,
|
||||
hasUnresolvedParams, opts->plannerRestrictionContext);
|
||||
|
||||
/*
|
||||
* If no plan was generated, prepare a generic error to be emitted.
|
||||
|
@ -690,7 +687,7 @@ CreateDistributedPlannedStmt(PlannedStmt *localPlan, Query *originalQuery,
|
|||
* if it is planned as a multi shard modify query.
|
||||
*/
|
||||
if ((distributedPlan->planningError ||
|
||||
(IsUpdateOrDelete(originalQuery) && IsMultiTaskPlan(distributedPlan))) &&
|
||||
(IsUpdateOrDelete(opts->originalQuery) && IsMultiTaskPlan(distributedPlan))) &&
|
||||
hasUnresolvedParams)
|
||||
{
|
||||
/*
|
||||
|
|
Loading…
Reference in New Issue