mirror of https://github.com/citusdata/citus.git
Refactor distributed_planner for better understandability
parent
5a1e752726
commit
5b0baea72c
|
@ -66,14 +66,10 @@ static uint64 NextPlanId = 1;
|
||||||
/* keep track of planner call stack levels */
|
/* keep track of planner call stack levels */
|
||||||
int PlannerLevel = 0;
|
int PlannerLevel = 0;
|
||||||
|
|
||||||
|
|
||||||
static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
||||||
static bool IsUpdateOrDelete(Query *query);
|
static bool IsUpdateOrDelete(Query *query);
|
||||||
static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan,
|
static PlannedStmt * CreateDistributedPlannedStmt(
|
||||||
Query *originalQuery, Query *query,
|
DistributedPlanningContext *planContext);
|
||||||
ParamListInfo boundParams,
|
|
||||||
PlannerRestrictionContext *
|
|
||||||
plannerRestrictionContext);
|
|
||||||
static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
|
static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
|
||||||
Query *query, ParamListInfo boundParams,
|
Query *query, ParamListInfo boundParams,
|
||||||
bool hasUnresolvedParams,
|
bool hasUnresolvedParams,
|
||||||
|
@ -88,8 +84,6 @@ static int AssignRTEIdentities(List *rangeTableList, int rteIdCounter);
|
||||||
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
|
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
|
||||||
static void AdjustPartitioningForDistributedPlanning(List *rangeTableList,
|
static void AdjustPartitioningForDistributedPlanning(List *rangeTableList,
|
||||||
bool setPartitionedTablesInherited);
|
bool setPartitionedTablesInherited);
|
||||||
static PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
|
|
||||||
DistributedPlan *distributedPlan);
|
|
||||||
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
|
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
|
||||||
DistributedPlan *distributedPlan,
|
DistributedPlan *distributedPlan,
|
||||||
CustomScan *customScan);
|
CustomScan *customScan);
|
||||||
|
@ -117,6 +111,10 @@ static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boun
|
||||||
static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
|
static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
|
||||||
static bool QueryIsNotSimpleSelect(Node *node);
|
static bool QueryIsNotSimpleSelect(Node *node);
|
||||||
static bool UpdateReferenceTablesWithShard(Node *node, void *context);
|
static bool UpdateReferenceTablesWithShard(Node *node, void *context);
|
||||||
|
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
|
||||||
|
Const *distributionKeyValue);
|
||||||
|
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
|
||||||
|
List *rangeTableList, int rteIdCounter);
|
||||||
|
|
||||||
/* Distributed planner hook */
|
/* Distributed planner hook */
|
||||||
PlannedStmt *
|
PlannedStmt *
|
||||||
|
@ -124,12 +122,16 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
{
|
{
|
||||||
PlannedStmt *result = NULL;
|
PlannedStmt *result = NULL;
|
||||||
bool needsDistributedPlanning = false;
|
bool needsDistributedPlanning = false;
|
||||||
Query *originalQuery = NULL;
|
|
||||||
bool setPartitionedTablesInherited = false;
|
bool setPartitionedTablesInherited = false;
|
||||||
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||||
int rteIdCounter = 1;
|
int rteIdCounter = 1;
|
||||||
bool fastPathRouterQuery = false;
|
bool fastPathRouterQuery = false;
|
||||||
Const *distributionKeyValue = NULL;
|
Const *distributionKeyValue = NULL;
|
||||||
|
DistributedPlanningContext planContext = {
|
||||||
|
.query = parse,
|
||||||
|
.cursorOptions = cursorOptions,
|
||||||
|
.boundParams = boundParams,
|
||||||
|
};
|
||||||
|
|
||||||
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
|
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
|
||||||
{
|
{
|
||||||
|
@ -160,7 +162,17 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (needsDistributedPlanning)
|
if (fastPathRouterQuery)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We need to copy the parse tree because the FastPathPlanner modifies
|
||||||
|
* it. In the next branch we do the same for other distributed queries
|
||||||
|
* too, but for those it needs to be done AFTER calling
|
||||||
|
* AssignRTEIdentities.
|
||||||
|
*/
|
||||||
|
planContext.originalQuery = copyObject(parse);
|
||||||
|
}
|
||||||
|
else if (needsDistributedPlanning)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Inserting into a local table needs to go through the regular postgres
|
* Inserting into a local table needs to go through the regular postgres
|
||||||
|
@ -168,7 +180,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
* don't have a way of doing both things and therefore error out, but do
|
* don't have a way of doing both things and therefore error out, but do
|
||||||
* have a handy tip for users.
|
* have a handy tip for users.
|
||||||
*/
|
*/
|
||||||
if (!fastPathRouterQuery && InsertSelectIntoLocalTable(parse))
|
if (InsertSelectIntoLocalTable(parse))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a "
|
ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a "
|
||||||
"local table"),
|
"local table"),
|
||||||
|
@ -179,32 +191,17 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* standard_planner scribbles on it's input, but for deparsing we need the
|
* standard_planner scribbles on it's input, but for deparsing we need the
|
||||||
* unmodified form. Note that we keep RTE_RELATIONs with their identities
|
* unmodified form. Note that before copying we call
|
||||||
* set, which doesn't break our goals, but, prevents us keeping an extra copy
|
* AssignRTEIdentities, which is needed because these identities need
|
||||||
* of the query tree. Note that we copy the query tree once we're sure it's a
|
* to be present in the copied query too.
|
||||||
* distributed query.
|
|
||||||
*
|
|
||||||
* Since fast-path queries do not through standard planner, we skip unnecessary
|
|
||||||
* parts in that case.
|
|
||||||
*/
|
*/
|
||||||
if (!fastPathRouterQuery)
|
|
||||||
{
|
|
||||||
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
||||||
originalQuery = copyObject(parse);
|
planContext.originalQuery = copyObject(parse);
|
||||||
|
|
||||||
setPartitionedTablesInherited = false;
|
setPartitionedTablesInherited = false;
|
||||||
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||||
setPartitionedTablesInherited);
|
setPartitionedTablesInherited);
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We still need to copy the parse tree because the FastPathPlanner
|
|
||||||
* modifies it.
|
|
||||||
*/
|
|
||||||
originalQuery = copyObject(parse);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make sure that we hide shard names on the Citus MX worker nodes. See comments in
|
* Make sure that we hide shard names on the Citus MX worker nodes. See comments in
|
||||||
|
@ -213,83 +210,42 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
ReplaceTableVisibleFunction((Node *) parse);
|
ReplaceTableVisibleFunction((Node *) parse);
|
||||||
|
|
||||||
/* create a restriction context and put it at the end if context list */
|
/* create a restriction context and put it at the end if context list */
|
||||||
PlannerRestrictionContext *plannerRestrictionContext =
|
planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
|
||||||
CreateAndPushPlannerRestrictionContext();
|
|
||||||
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
/*
|
/*
|
||||||
* We keep track of how many times we've recursed into the planner, primarily
|
* We keep track of how many times we've recursed into the planner, primarily
|
||||||
* to detect whether we are in a function call. We need to make sure that the
|
* to detect whether we are in a function call. We need to make sure that the
|
||||||
* PlannerLevel is decremented exactly once at the end of this PG_TRY block,
|
* PlannerLevel is decremented exactly once at the end of the next PG_TRY
|
||||||
* both in the happy case and when an error occurs.
|
* block, both in the happy case and when an error occurs.
|
||||||
*/
|
*/
|
||||||
PlannerLevel++;
|
PlannerLevel++;
|
||||||
|
|
||||||
/*
|
|
||||||
* For trivial queries, we're skipping the standard_planner() in
|
|
||||||
* order to eliminate its overhead.
|
|
||||||
*
|
|
||||||
* Otherwise, call into standard planner. This is required because the Citus
|
|
||||||
* planner relies on both the restriction information per table and parse tree
|
|
||||||
* transformations made by postgres' planner.
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (needsDistributedPlanning && fastPathRouterQuery)
|
PG_TRY();
|
||||||
{
|
{
|
||||||
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery =
|
if (fastPathRouterQuery)
|
||||||
true;
|
{
|
||||||
plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue =
|
result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue);
|
||||||
distributionKeyValue;
|
|
||||||
|
|
||||||
result = FastPathPlanner(originalQuery, parse, boundParams);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
|
||||||
result = standard_planner(parse, cursorOptions, boundParams);
|
|
||||||
|
|
||||||
if (needsDistributedPlanning)
|
|
||||||
{
|
|
||||||
/* may've inlined new relation rtes */
|
|
||||||
rangeTableList = ExtractRangeTableEntryList(parse);
|
|
||||||
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (needsDistributedPlanning)
|
|
||||||
{
|
|
||||||
uint64 planId = NextPlanId++;
|
|
||||||
|
|
||||||
result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse,
|
|
||||||
boundParams, plannerRestrictionContext);
|
|
||||||
|
|
||||||
if (!fastPathRouterQuery)
|
|
||||||
{
|
|
||||||
setPartitionedTablesInherited = true;
|
|
||||||
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
|
||||||
setPartitionedTablesInherited);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
bool hasExternParam = false;
|
|
||||||
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse,
|
|
||||||
&hasExternParam);
|
|
||||||
if (delegatePlan != NULL)
|
|
||||||
{
|
|
||||||
result = FinalizePlan(result, delegatePlan);
|
|
||||||
}
|
|
||||||
else if (hasExternParam)
|
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* As in CreateDistributedPlannedStmt, try dissuade planner when planning
|
* Call into standard_planner because the Citus planner relies on both the
|
||||||
* potentially failed due to unresolved prepared statement parameters.
|
* restriction information per table and parse tree transformations made by
|
||||||
|
* postgres' planner.
|
||||||
*/
|
*/
|
||||||
result->planTree->total_cost = FLT_MAX / 100000000;
|
planContext.plan = standard_planner(planContext.query,
|
||||||
|
planContext.cursorOptions,
|
||||||
|
planContext.boundParams);
|
||||||
|
if (needsDistributedPlanning)
|
||||||
|
{
|
||||||
|
result = PlanDistributedStmt(&planContext, rangeTableList, rteIdCounter);
|
||||||
|
}
|
||||||
|
else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL)
|
||||||
|
{
|
||||||
|
result = planContext.plan;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
PlannerLevel--;
|
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
{
|
{
|
||||||
|
@ -301,6 +257,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
}
|
}
|
||||||
PG_END_TRY();
|
PG_END_TRY();
|
||||||
|
|
||||||
|
PlannerLevel--;
|
||||||
|
|
||||||
/* remove the context from the context list */
|
/* remove the context from the context list */
|
||||||
PopPlannerRestrictionContext();
|
PopPlannerRestrictionContext();
|
||||||
|
@ -578,30 +535,91 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PlanFastPathDistributedStmt creates a distributed planned statement using
|
||||||
|
* the FastPathPlanner.
|
||||||
|
*/
|
||||||
|
static PlannedStmt *
|
||||||
|
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
|
||||||
|
Const *distributionKeyValue)
|
||||||
|
{
|
||||||
|
planContext->plannerRestrictionContext->fastPathRestrictionContext->
|
||||||
|
fastPathRouterQuery = true;
|
||||||
|
planContext->plannerRestrictionContext->fastPathRestrictionContext->
|
||||||
|
distributionKeyValue = distributionKeyValue;
|
||||||
|
|
||||||
|
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
|
||||||
|
planContext->boundParams);
|
||||||
|
|
||||||
|
return CreateDistributedPlannedStmt(planContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PlanDistributedStmt creates a distributed planned statement using the PG
|
||||||
|
* planner.
|
||||||
|
*/
|
||||||
|
static PlannedStmt *
|
||||||
|
PlanDistributedStmt(DistributedPlanningContext *planContext,
|
||||||
|
List *rangeTableList,
|
||||||
|
int rteIdCounter)
|
||||||
|
{
|
||||||
|
/* may've inlined new relation rtes */
|
||||||
|
rangeTableList = ExtractRangeTableEntryList(planContext->query);
|
||||||
|
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
|
||||||
|
|
||||||
|
|
||||||
|
PlannedStmt *result = CreateDistributedPlannedStmt(planContext);
|
||||||
|
|
||||||
|
bool setPartitionedTablesInherited = true;
|
||||||
|
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||||
|
setPartitionedTablesInherited);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DissuadePlannerFromUsingPlan try dissuade planner when planning a plan that
|
||||||
|
* potentially failed due to unresolved prepared statement parameters.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
DissuadePlannerFromUsingPlan(PlannedStmt *plan)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Arbitrarily high cost, but low enough that it can be added up
|
||||||
|
* without overflowing by choose_custom_plan().
|
||||||
|
*/
|
||||||
|
plan->planTree->total_cost = FLT_MAX / 100000000;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular
|
* CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular
|
||||||
* query into a distributed plan that is encapsulated by a PlannedStmt.
|
* query into a distributed plan that is encapsulated by a PlannedStmt.
|
||||||
*/
|
*/
|
||||||
static PlannedStmt *
|
static PlannedStmt *
|
||||||
CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *originalQuery,
|
CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
|
||||||
Query *query, ParamListInfo boundParams,
|
|
||||||
PlannerRestrictionContext *plannerRestrictionContext)
|
|
||||||
{
|
{
|
||||||
|
uint64 planId = NextPlanId++;
|
||||||
bool hasUnresolvedParams = false;
|
bool hasUnresolvedParams = false;
|
||||||
JoinRestrictionContext *joinRestrictionContext =
|
JoinRestrictionContext *joinRestrictionContext =
|
||||||
plannerRestrictionContext->joinRestrictionContext;
|
planContext->plannerRestrictionContext->joinRestrictionContext;
|
||||||
|
|
||||||
if (HasUnresolvedExternParamsWalker((Node *) originalQuery, boundParams))
|
if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery,
|
||||||
|
planContext->boundParams))
|
||||||
{
|
{
|
||||||
hasUnresolvedParams = true;
|
hasUnresolvedParams = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
plannerRestrictionContext->joinRestrictionContext =
|
planContext->plannerRestrictionContext->joinRestrictionContext =
|
||||||
RemoveDuplicateJoinRestrictions(joinRestrictionContext);
|
RemoveDuplicateJoinRestrictions(joinRestrictionContext);
|
||||||
|
|
||||||
DistributedPlan *distributedPlan =
|
DistributedPlan *distributedPlan =
|
||||||
CreateDistributedPlan(planId, originalQuery, query, boundParams,
|
CreateDistributedPlan(planId, planContext->originalQuery, planContext->query,
|
||||||
hasUnresolvedParams, plannerRestrictionContext);
|
planContext->boundParams,
|
||||||
|
hasUnresolvedParams,
|
||||||
|
planContext->plannerRestrictionContext);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If no plan was generated, prepare a generic error to be emitted.
|
* If no plan was generated, prepare a generic error to be emitted.
|
||||||
|
@ -646,7 +664,7 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi
|
||||||
distributedPlan->planId = planId;
|
distributedPlan->planId = planId;
|
||||||
|
|
||||||
/* create final plan by combining local plan with distributed plan */
|
/* create final plan by combining local plan with distributed plan */
|
||||||
PlannedStmt *resultPlan = FinalizePlan(localPlan, distributedPlan);
|
PlannedStmt *resultPlan = FinalizePlan(planContext->plan, distributedPlan);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* As explained above, force planning costs to be unrealistically high if
|
* As explained above, force planning costs to be unrealistically high if
|
||||||
|
@ -654,14 +672,11 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi
|
||||||
* if it is planned as a multi shard modify query.
|
* if it is planned as a multi shard modify query.
|
||||||
*/
|
*/
|
||||||
if ((distributedPlan->planningError ||
|
if ((distributedPlan->planningError ||
|
||||||
(IsUpdateOrDelete(originalQuery) && IsMultiTaskPlan(distributedPlan))) &&
|
(IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan(
|
||||||
|
distributedPlan))) &&
|
||||||
hasUnresolvedParams)
|
hasUnresolvedParams)
|
||||||
{
|
{
|
||||||
/*
|
DissuadePlannerFromUsingPlan(resultPlan);
|
||||||
* Arbitrarily high cost, but low enough that it can be added up
|
|
||||||
* without overflowing by choose_custom_plan().
|
|
||||||
*/
|
|
||||||
resultPlan->planTree->total_cost = FLT_MAX / 100000000;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return resultPlan;
|
return resultPlan;
|
||||||
|
@ -1099,7 +1114,7 @@ GetDistributedPlan(CustomScan *customScan)
|
||||||
* FinalizePlan combines local plan with distributed plan and creates a plan
|
* FinalizePlan combines local plan with distributed plan and creates a plan
|
||||||
* which can be run by the PostgreSQL executor.
|
* which can be run by the PostgreSQL executor.
|
||||||
*/
|
*/
|
||||||
static PlannedStmt *
|
PlannedStmt *
|
||||||
FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
|
FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
|
||||||
{
|
{
|
||||||
PlannedStmt *finalPlan = NULL;
|
PlannedStmt *finalPlan = NULL;
|
||||||
|
|
|
@ -95,8 +95,8 @@ contain_param_walker(Node *node, void *context)
|
||||||
* forms involving multiple function calls, FROM clauses, WHERE clauses,
|
* forms involving multiple function calls, FROM clauses, WHERE clauses,
|
||||||
* ... Those complex forms are handled in the coordinator.
|
* ... Those complex forms are handled in the coordinator.
|
||||||
*/
|
*/
|
||||||
DistributedPlan *
|
PlannedStmt *
|
||||||
TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
|
||||||
{
|
{
|
||||||
List *targetList = NIL;
|
List *targetList = NIL;
|
||||||
TargetEntry *targetEntry = NULL;
|
TargetEntry *targetEntry = NULL;
|
||||||
|
@ -114,12 +114,9 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
StringInfo queryString = NULL;
|
StringInfo queryString = NULL;
|
||||||
Task *task = NULL;
|
Task *task = NULL;
|
||||||
Job *job = NULL;
|
Job *job = NULL;
|
||||||
DistributedPlan *distributedPlan = NULL;
|
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
|
||||||
struct ParamWalkerContext walkerParamContext = { 0 };
|
struct ParamWalkerContext walkerParamContext = { 0 };
|
||||||
|
|
||||||
/* set hasExternParam now in case of early exit */
|
|
||||||
*hasExternParam = false;
|
|
||||||
|
|
||||||
if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4))
|
if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4))
|
||||||
{
|
{
|
||||||
/* Citus is not ready to determine whether function is distributed */
|
/* Citus is not ready to determine whether function is distributed */
|
||||||
|
@ -133,19 +130,19 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (query == NULL)
|
if (planContext->query == NULL)
|
||||||
{
|
{
|
||||||
/* no query (mostly here to be defensive) */
|
/* no query (mostly here to be defensive) */
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (query->commandType != CMD_SELECT)
|
if (planContext->query->commandType != CMD_SELECT)
|
||||||
{
|
{
|
||||||
/* not a SELECT */
|
/* not a SELECT */
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
FromExpr *joinTree = query->jointree;
|
FromExpr *joinTree = planContext->query->jointree;
|
||||||
if (joinTree == NULL)
|
if (joinTree == NULL)
|
||||||
{
|
{
|
||||||
/* no join tree (mostly here to be defensive) */
|
/* no join tree (mostly here to be defensive) */
|
||||||
|
@ -174,7 +171,8 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
|
|
||||||
if (IsA(reference, RangeTblRef))
|
if (IsA(reference, RangeTblRef))
|
||||||
{
|
{
|
||||||
RangeTblEntry *rtentry = rt_fetch(reference->rtindex, query->rtable);
|
RangeTblEntry *rtentry = rt_fetch(reference->rtindex,
|
||||||
|
planContext->query->rtable);
|
||||||
if (rtentry->rtekind != RTE_RESULT)
|
if (rtentry->rtekind != RTE_RESULT)
|
||||||
{
|
{
|
||||||
/* e.g. SELECT f() FROM rel */
|
/* e.g. SELECT f() FROM rel */
|
||||||
|
@ -203,8 +201,8 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
targetList = query->targetList;
|
targetList = planContext->query->targetList;
|
||||||
if (list_length(query->targetList) != 1)
|
if (list_length(planContext->query->targetList) != 1)
|
||||||
{
|
{
|
||||||
/* multiple target list items */
|
/* multiple target list items */
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -288,7 +286,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
if (partitionParam->paramkind == PARAM_EXTERN)
|
if (partitionParam->paramkind == PARAM_EXTERN)
|
||||||
{
|
{
|
||||||
/* Don't log a message, we should end up here again without a parameter */
|
/* Don't log a message, we should end up here again without a parameter */
|
||||||
*hasExternParam = true;
|
DissuadePlannerFromUsingPlan(planContext->plan);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -354,7 +352,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
if (walkerParamContext.paramKind == PARAM_EXTERN)
|
if (walkerParamContext.paramKind == PARAM_EXTERN)
|
||||||
{
|
{
|
||||||
/* Don't log a message, we should end up here again without a parameter */
|
/* Don't log a message, we should end up here again without a parameter */
|
||||||
*hasExternParam = true;
|
DissuadePlannerFromUsingPlan(planContext->plan);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -367,7 +365,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
ereport(DEBUG1, (errmsg("pushing down the function call")));
|
ereport(DEBUG1, (errmsg("pushing down the function call")));
|
||||||
|
|
||||||
queryString = makeStringInfo();
|
queryString = makeStringInfo();
|
||||||
pg_get_query_def(query, queryString);
|
pg_get_query_def(planContext->query, queryString);
|
||||||
|
|
||||||
task = CitusMakeNode(Task);
|
task = CitusMakeNode(Task);
|
||||||
task->taskType = SELECT_TASK;
|
task->taskType = SELECT_TASK;
|
||||||
|
@ -378,7 +376,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
|
|
||||||
job = CitusMakeNode(Job);
|
job = CitusMakeNode(Job);
|
||||||
job->jobId = UniqueJobId();
|
job->jobId = UniqueJobId();
|
||||||
job->jobQuery = query;
|
job->jobQuery = planContext->query;
|
||||||
job->taskList = list_make1(task);
|
job->taskList = list_make1(task);
|
||||||
|
|
||||||
distributedPlan = CitusMakeNode(DistributedPlan);
|
distributedPlan = CitusMakeNode(DistributedPlan);
|
||||||
|
@ -390,5 +388,5 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
/* worker will take care of any necessary locking, treat query as read-only */
|
/* worker will take care of any necessary locking, treat query as read-only */
|
||||||
distributedPlan->modLevel = ROW_MODIFY_READONLY;
|
distributedPlan->modLevel = ROW_MODIFY_READONLY;
|
||||||
|
|
||||||
return distributedPlan;
|
return FinalizePlan(planContext->plan, distributedPlan);
|
||||||
}
|
}
|
||||||
|
|
|
@ -156,7 +156,8 @@ static int CompareInsertValuesByShardId(const void *leftElement,
|
||||||
static uint64 GetAnchorShardId(List *relationShardList);
|
static uint64 GetAnchorShardId(List *relationShardList);
|
||||||
static List * TargetShardIntervalForFastPathQuery(Query *query,
|
static List * TargetShardIntervalForFastPathQuery(Query *query,
|
||||||
Const **partitionValueConst,
|
Const **partitionValueConst,
|
||||||
bool *isMultiShardQuery);
|
bool *isMultiShardQuery,
|
||||||
|
Const *distributionKeyValue);
|
||||||
static List * SingleShardSelectTaskList(Query *query, uint64 jobId,
|
static List * SingleShardSelectTaskList(Query *query, uint64 jobId,
|
||||||
List *relationShardList, List *placementList,
|
List *relationShardList, List *placementList,
|
||||||
uint64 shardId);
|
uint64 shardId);
|
||||||
|
@ -2043,31 +2044,12 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
*/
|
*/
|
||||||
if (fastPathRouterQuery)
|
if (fastPathRouterQuery)
|
||||||
{
|
{
|
||||||
List *shardIntervalList = NIL;
|
|
||||||
Const *distributionKeyValue =
|
Const *distributionKeyValue =
|
||||||
plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue;
|
plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue;
|
||||||
|
|
||||||
if (distributionKeyValue)
|
List *shardIntervalList =
|
||||||
{
|
|
||||||
Oid relationId = ExtractFirstDistributedTableId(originalQuery);
|
|
||||||
DistTableCacheEntry *cache = DistributedTableCacheEntry(relationId);
|
|
||||||
ShardInterval *shardInterval =
|
|
||||||
FindShardInterval(distributionKeyValue->constvalue, cache);
|
|
||||||
|
|
||||||
shardIntervalList = list_make1(shardInterval);
|
|
||||||
|
|
||||||
if (partitionValueConst != NULL)
|
|
||||||
{
|
|
||||||
/* set the outgoing partition column value if requested */
|
|
||||||
*partitionValueConst = distributionKeyValue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
shardIntervalList =
|
|
||||||
TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst,
|
TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst,
|
||||||
&isMultiShardQuery);
|
&isMultiShardQuery, distributionKeyValue);
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -2289,15 +2271,27 @@ GetAnchorShardId(List *prunedShardIntervalListList)
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst,
|
TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst,
|
||||||
bool *isMultiShardQuery)
|
bool *isMultiShardQuery, Const *distributionKeyValue)
|
||||||
{
|
{
|
||||||
Const *queryPartitionValueConst = NULL;
|
|
||||||
|
|
||||||
Oid relationId = ExtractFirstDistributedTableId(query);
|
Oid relationId = ExtractFirstDistributedTableId(query);
|
||||||
|
|
||||||
|
if (distributionKeyValue)
|
||||||
|
{
|
||||||
|
DistTableCacheEntry *cache = DistributedTableCacheEntry(relationId);
|
||||||
|
ShardInterval *shardInterval =
|
||||||
|
FindShardInterval(distributionKeyValue->constvalue, cache);
|
||||||
|
|
||||||
|
if (partitionValueConst != NULL)
|
||||||
|
{
|
||||||
|
/* set the outgoing partition column value if requested */
|
||||||
|
*partitionValueConst = distributionKeyValue;
|
||||||
|
}
|
||||||
|
return list_make1(shardInterval);
|
||||||
|
}
|
||||||
|
|
||||||
Node *quals = query->jointree->quals;
|
Node *quals = query->jointree->quals;
|
||||||
|
|
||||||
int relationIndex = 1;
|
int relationIndex = 1;
|
||||||
|
Const *queryPartitionValueConst = NULL;
|
||||||
List *prunedShardIntervalList =
|
List *prunedShardIntervalList =
|
||||||
PruneShards(relationId, relationIndex, make_ands_implicit((Expr *) quals),
|
PruneShards(relationId, relationIndex, make_ands_implicit((Expr *) quals),
|
||||||
&queryPartitionValueConst);
|
&queryPartitionValueConst);
|
||||||
|
|
|
@ -129,6 +129,33 @@ typedef struct RelationRowLock
|
||||||
} RelationRowLock;
|
} RelationRowLock;
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct DistributedPlanningContext
|
||||||
|
{
|
||||||
|
/* The parsed query that is given to the planner. It is a slightly modified
|
||||||
|
* to work with the standard_planner */
|
||||||
|
Query *query;
|
||||||
|
|
||||||
|
/* A copy of the original parsed query that is given to the planner. This
|
||||||
|
* doesn't contain most of the changes that are made to parse. There's one
|
||||||
|
* that change that is made for non fast path router queries though, which
|
||||||
|
* is the assigning of RTE identities using AssignRTEIdentities. This is
|
||||||
|
* NULL for non distributed plans, since those don't need it. */
|
||||||
|
Query *originalQuery;
|
||||||
|
|
||||||
|
/* the cursor options given to the planner */
|
||||||
|
int cursorOptions;
|
||||||
|
|
||||||
|
/* the ParamListInfo that is given to the planner */
|
||||||
|
ParamListInfo boundParams;
|
||||||
|
|
||||||
|
/* Plan created either by standard_planner or by FastPathPlanner */
|
||||||
|
PlannedStmt *plan;
|
||||||
|
|
||||||
|
/* Our custom restriction context */
|
||||||
|
PlannerRestrictionContext *plannerRestrictionContext;
|
||||||
|
} DistributedPlanningContext;
|
||||||
|
|
||||||
|
|
||||||
extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions,
|
extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions,
|
||||||
ParamListInfo boundParams);
|
ParamListInfo boundParams);
|
||||||
extern List * ExtractRangeTableEntryList(Query *query);
|
extern List * ExtractRangeTableEntryList(Query *query);
|
||||||
|
@ -150,5 +177,8 @@ extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
|
||||||
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
|
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
|
||||||
extern int GetRTEIdentity(RangeTblEntry *rte);
|
extern int GetRTEIdentity(RangeTblEntry *rte);
|
||||||
extern int32 BlessRecordExpression(Expr *expr);
|
extern int32 BlessRecordExpression(Expr *expr);
|
||||||
|
extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan);
|
||||||
|
extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
|
||||||
|
struct DistributedPlan *distributedPlan);
|
||||||
|
|
||||||
#endif /* DISTRIBUTED_PLANNER_H */
|
#endif /* DISTRIBUTED_PLANNER_H */
|
||||||
|
|
|
@ -10,10 +10,12 @@
|
||||||
#define FUNCTION_CALL_DELEGATION_H
|
#define FUNCTION_CALL_DELEGATION_H
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "distributed/distributed_planner.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
|
||||||
|
|
||||||
DistributedPlan * TryToDelegateFunctionCall(Query *query, bool *hasParam);
|
PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext);
|
||||||
|
|
||||||
|
|
||||||
#endif /* FUNCTION_CALL_DELEGATION_H */
|
#endif /* FUNCTION_CALL_DELEGATION_H */
|
||||||
|
|
Loading…
Reference in New Issue