mirror of https://github.com/citusdata/citus.git
Rename some things based on reviews
parent
5f719b6358
commit
6251363bfb
|
@ -66,20 +66,32 @@ static uint64 NextPlanId = 1;
|
||||||
/* keep track of planner call stack levels */
|
/* keep track of planner call stack levels */
|
||||||
int PlannerLevel = 0;
|
int PlannerLevel = 0;
|
||||||
|
|
||||||
typedef struct DistributedStmtOptions
|
typedef struct DistributedPlanningContext
|
||||||
{
|
{
|
||||||
|
/* The parsed query that is given to the planner. It is a slightly modified
|
||||||
|
* to work with the standard_planner */
|
||||||
Query *parse;
|
Query *parse;
|
||||||
int cursorOptions;
|
|
||||||
ParamListInfo boundParams;
|
/* A copy of the original parsed query that is given to the planner. This
|
||||||
|
* doesn't contain the changes that are made to parse. This is NULL for non
|
||||||
|
* distributed plans, since those don't need it. */
|
||||||
Query *originalQuery;
|
Query *originalQuery;
|
||||||
|
|
||||||
|
/* the cursor options given to the planner */
|
||||||
|
int cursorOptions;
|
||||||
|
|
||||||
|
/* the ParamListInfo that is given to the planner */
|
||||||
|
ParamListInfo boundParams;
|
||||||
|
|
||||||
|
/* Our custom restriction context */
|
||||||
PlannerRestrictionContext *plannerRestrictionContext;
|
PlannerRestrictionContext *plannerRestrictionContext;
|
||||||
} DistributedStmtOptions;
|
} DistributedPlanningContext;
|
||||||
|
|
||||||
|
|
||||||
static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
||||||
static bool IsUpdateOrDelete(Query *query);
|
static bool IsUpdateOrDelete(Query *query);
|
||||||
static PlannedStmt * CreateDistributedPlannedStmt(PlannedStmt *localPlan,
|
static PlannedStmt * CreateDistributedPlannedStmt(PlannedStmt *localPlan,
|
||||||
DistributedStmtOptions *opts);
|
DistributedPlanningContext *ctx);
|
||||||
static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
|
static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
|
||||||
Query *query, ParamListInfo boundParams,
|
Query *query, ParamListInfo boundParams,
|
||||||
bool hasUnresolvedParams,
|
bool hasUnresolvedParams,
|
||||||
|
@ -123,11 +135,11 @@ static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boun
|
||||||
static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
|
static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
|
||||||
static bool QueryIsNotSimpleSelect(Node *node);
|
static bool QueryIsNotSimpleSelect(Node *node);
|
||||||
static bool UpdateReferenceTablesWithShard(Node *node, void *context);
|
static bool UpdateReferenceTablesWithShard(Node *node, void *context);
|
||||||
static PlannedStmt * FastPastDistributedStmt(DistributedStmtOptions *opts,
|
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *ctx,
|
||||||
Const *distributionKeyValue);
|
Const *distributionKeyValue);
|
||||||
static PlannedStmt * NonFastPathDistributedStmt(DistributedStmtOptions *opts,
|
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *ctx,
|
||||||
List *rangeTableList, int rteIdCounter);
|
List *rangeTableList, int rteIdCounter);
|
||||||
static PlannedStmt * NonDistributedStmt(DistributedStmtOptions *opts);
|
static PlannedStmt * PlanNonDistributedStmt(DistributedPlanningContext *ctx);
|
||||||
|
|
||||||
/* Distributed planner hook */
|
/* Distributed planner hook */
|
||||||
PlannedStmt *
|
PlannedStmt *
|
||||||
|
@ -140,7 +152,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
int rteIdCounter = 1;
|
int rteIdCounter = 1;
|
||||||
bool fastPathRouterQuery = false;
|
bool fastPathRouterQuery = false;
|
||||||
Const *distributionKeyValue = NULL;
|
Const *distributionKeyValue = NULL;
|
||||||
DistributedStmtOptions opts = {
|
DistributedPlanningContext ctx = {
|
||||||
.parse = parse,
|
.parse = parse,
|
||||||
.cursorOptions = cursorOptions,
|
.cursorOptions = cursorOptions,
|
||||||
.boundParams = boundParams,
|
.boundParams = boundParams,
|
||||||
|
@ -181,7 +193,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
* We need to copy the parse tree because the FastPathPlanner modifies
|
* We need to copy the parse tree because the FastPathPlanner modifies
|
||||||
* it.
|
* it.
|
||||||
*/
|
*/
|
||||||
opts.originalQuery = copyObject(parse);
|
ctx.originalQuery = copyObject(parse);
|
||||||
}
|
}
|
||||||
else if (needsDistributedPlanning)
|
else if (needsDistributedPlanning)
|
||||||
{
|
{
|
||||||
|
@ -208,7 +220,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
* distributed query.
|
* distributed query.
|
||||||
*/
|
*/
|
||||||
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
||||||
opts.originalQuery = copyObject(parse);
|
ctx.originalQuery = copyObject(parse);
|
||||||
|
|
||||||
setPartitionedTablesInherited = false;
|
setPartitionedTablesInherited = false;
|
||||||
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||||
|
@ -222,7 +234,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
ReplaceTableVisibleFunction((Node *) parse);
|
ReplaceTableVisibleFunction((Node *) parse);
|
||||||
|
|
||||||
/* create a restriction context and put it at the end if context list */
|
/* create a restriction context and put it at the end if context list */
|
||||||
opts.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
|
ctx.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We keep track of how many times we've recursed into the planner, primarily
|
* We keep track of how many times we've recursed into the planner, primarily
|
||||||
|
@ -245,15 +257,15 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
*/
|
*/
|
||||||
if (fastPathRouterQuery)
|
if (fastPathRouterQuery)
|
||||||
{
|
{
|
||||||
result = FastPastDistributedStmt(&opts, distributionKeyValue);
|
result = PlanFastPathDistributedStmt(&ctx, distributionKeyValue);
|
||||||
}
|
}
|
||||||
else if (needsDistributedPlanning)
|
else if (needsDistributedPlanning)
|
||||||
{
|
{
|
||||||
result = NonFastPathDistributedStmt(&opts, rangeTableList, rteIdCounter);
|
result = PlanDistributedStmt(&ctx, rangeTableList, rteIdCounter);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
result = NonDistributedStmt(&opts);
|
result = PlanNonDistributedStmt(&ctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
|
@ -545,30 +557,30 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FastPathDistributedStmt creates a distributed planned statement using the
|
* PlanFastPathDistributedStmt creates a distributed planned statement using
|
||||||
* FastPathPlanner.
|
* the FastPathPlanner.
|
||||||
*/
|
*/
|
||||||
static PlannedStmt *
|
static PlannedStmt *
|
||||||
FastPastDistributedStmt(DistributedStmtOptions *opts, Const *distributionKeyValue)
|
PlanFastPathDistributedStmt(DistributedPlanningContext *ctx, Const *distributionKeyValue)
|
||||||
{
|
{
|
||||||
opts->plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery =
|
ctx->plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery =
|
||||||
true;
|
true;
|
||||||
opts->plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue =
|
ctx->plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue =
|
||||||
distributionKeyValue;
|
distributionKeyValue;
|
||||||
|
|
||||||
PlannedStmt *fastPathPlan = FastPathPlanner(opts->originalQuery, opts->parse,
|
PlannedStmt *fastPathPlan = FastPathPlanner(ctx->originalQuery, ctx->parse,
|
||||||
opts->boundParams);
|
ctx->boundParams);
|
||||||
|
|
||||||
return CreateDistributedPlannedStmt(fastPathPlan, opts);
|
return CreateDistributedPlannedStmt(fastPathPlan, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NonFastPathDistributedStmt creates a distributed planned statement using the
|
* PlanDistributedStmt creates a distributed planned statement using the PG
|
||||||
* PG planner.
|
* planner.
|
||||||
*/
|
*/
|
||||||
static PlannedStmt *
|
static PlannedStmt *
|
||||||
NonFastPathDistributedStmt(DistributedStmtOptions *opts,
|
PlanDistributedStmt(DistributedPlanningContext *ctx,
|
||||||
List *rangeTableList,
|
List *rangeTableList,
|
||||||
int rteIdCounter)
|
int rteIdCounter)
|
||||||
{
|
{
|
||||||
|
@ -577,15 +589,15 @@ NonFastPathDistributedStmt(DistributedStmtOptions *opts,
|
||||||
* restriction information per table and parse tree transformations made by
|
* restriction information per table and parse tree transformations made by
|
||||||
* postgres' planner.
|
* postgres' planner.
|
||||||
*/
|
*/
|
||||||
PlannedStmt *standardPlan = standard_planner(opts->parse, opts->cursorOptions,
|
PlannedStmt *standardPlan = standard_planner(ctx->parse, ctx->cursorOptions,
|
||||||
opts->boundParams);
|
ctx->boundParams);
|
||||||
|
|
||||||
/* may've inlined new relation rtes */
|
/* may've inlined new relation rtes */
|
||||||
rangeTableList = ExtractRangeTableEntryList(opts->parse);
|
rangeTableList = ExtractRangeTableEntryList(ctx->parse);
|
||||||
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
||||||
|
|
||||||
|
|
||||||
PlannedStmt *result = CreateDistributedPlannedStmt(standardPlan, opts);
|
PlannedStmt *result = CreateDistributedPlannedStmt(standardPlan, ctx);
|
||||||
|
|
||||||
bool setPartitionedTablesInherited = true;
|
bool setPartitionedTablesInherited = true;
|
||||||
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||||
|
@ -596,17 +608,17 @@ NonFastPathDistributedStmt(DistributedStmtOptions *opts,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NonDistributedStmt creates a normal (non-distributed) planned statement
|
* PlanNonDistributedStmt creates a normal (non-distributed) planned statement
|
||||||
* using the PG planner.
|
* using the PG planner.
|
||||||
*/
|
*/
|
||||||
static PlannedStmt *
|
static PlannedStmt *
|
||||||
NonDistributedStmt(DistributedStmtOptions *opts)
|
PlanNonDistributedStmt(DistributedPlanningContext *ctx)
|
||||||
{
|
{
|
||||||
PlannedStmt *result = standard_planner(opts->parse, opts->cursorOptions,
|
PlannedStmt *result = standard_planner(ctx->parse, ctx->cursorOptions,
|
||||||
opts->boundParams);
|
ctx->boundParams);
|
||||||
|
|
||||||
bool hasExternParam = false;
|
bool hasExternParam = false;
|
||||||
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(opts->parse,
|
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(ctx->parse,
|
||||||
&hasExternParam);
|
&hasExternParam);
|
||||||
if (delegatePlan != NULL)
|
if (delegatePlan != NULL)
|
||||||
{
|
{
|
||||||
|
@ -630,25 +642,25 @@ NonDistributedStmt(DistributedStmtOptions *opts)
|
||||||
* query into a distributed plan that is encapsulated by a PlannedStmt.
|
* query into a distributed plan that is encapsulated by a PlannedStmt.
|
||||||
*/
|
*/
|
||||||
static PlannedStmt *
|
static PlannedStmt *
|
||||||
CreateDistributedPlannedStmt(PlannedStmt *localPlan, DistributedStmtOptions *opts)
|
CreateDistributedPlannedStmt(PlannedStmt *localPlan, DistributedPlanningContext *ctx)
|
||||||
{
|
{
|
||||||
uint64 planId = NextPlanId++;
|
uint64 planId = NextPlanId++;
|
||||||
bool hasUnresolvedParams = false;
|
bool hasUnresolvedParams = false;
|
||||||
JoinRestrictionContext *joinRestrictionContext =
|
JoinRestrictionContext *joinRestrictionContext =
|
||||||
opts->plannerRestrictionContext->joinRestrictionContext;
|
ctx->plannerRestrictionContext->joinRestrictionContext;
|
||||||
|
|
||||||
if (HasUnresolvedExternParamsWalker((Node *) opts->originalQuery, opts->boundParams))
|
if (HasUnresolvedExternParamsWalker((Node *) ctx->originalQuery, ctx->boundParams))
|
||||||
{
|
{
|
||||||
hasUnresolvedParams = true;
|
hasUnresolvedParams = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
opts->plannerRestrictionContext->joinRestrictionContext =
|
ctx->plannerRestrictionContext->joinRestrictionContext =
|
||||||
RemoveDuplicateJoinRestrictions(joinRestrictionContext);
|
RemoveDuplicateJoinRestrictions(joinRestrictionContext);
|
||||||
|
|
||||||
DistributedPlan *distributedPlan =
|
DistributedPlan *distributedPlan =
|
||||||
CreateDistributedPlan(planId, opts->originalQuery, opts->parse,
|
CreateDistributedPlan(planId, ctx->originalQuery, ctx->parse,
|
||||||
opts->boundParams,
|
ctx->boundParams,
|
||||||
hasUnresolvedParams, opts->plannerRestrictionContext);
|
hasUnresolvedParams, ctx->plannerRestrictionContext);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If no plan was generated, prepare a generic error to be emitted.
|
* If no plan was generated, prepare a generic error to be emitted.
|
||||||
|
@ -701,7 +713,7 @@ CreateDistributedPlannedStmt(PlannedStmt *localPlan, DistributedStmtOptions *opt
|
||||||
* if it is planned as a multi shard modify query.
|
* if it is planned as a multi shard modify query.
|
||||||
*/
|
*/
|
||||||
if ((distributedPlan->planningError ||
|
if ((distributedPlan->planningError ||
|
||||||
(IsUpdateOrDelete(opts->originalQuery) && IsMultiTaskPlan(distributedPlan))) &&
|
(IsUpdateOrDelete(ctx->originalQuery) && IsMultiTaskPlan(distributedPlan))) &&
|
||||||
hasUnresolvedParams)
|
hasUnresolvedParams)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
|
Loading…
Reference in New Issue