Use fewer CPU cycles on fast-path planning (#3332)

Fast-path queries are introduced with #2606. The basic idea is that for very simple queries like SELECT count(*) FROM table WHERE dist_key = X, we can skip some parts of the distributed planning. The most notable thing to skip is standard_planner(), which was already done in #2606.

With this commit, we do some further optimizations. First, we used to call the function which decides whether the query is fast path twice, which can be reduced to one. Second, we used to do shard pruning for every query, now we'll optimize it for some cases. Finally, since the definition of fast-path queries are very strict, we can skip some query traversals.
pull/3324/head
Jelte Fennema 2020-01-06 14:54:11 +01:00 committed by GitHub
commit 16b4140dc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 305 additions and 131 deletions

View File

@ -66,14 +66,10 @@ static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */ /* keep track of planner call stack levels */
int PlannerLevel = 0; int PlannerLevel = 0;
static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool ListContainsDistributedTableRTE(List *rangeTableList);
static bool IsUpdateOrDelete(Query *query); static bool IsUpdateOrDelete(Query *query);
static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, static PlannedStmt * CreateDistributedPlannedStmt(
Query *originalQuery, Query *query, DistributedPlanningContext *planContext);
ParamListInfo boundParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery, static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
Query *query, ParamListInfo boundParams, Query *query, ParamListInfo boundParams,
bool hasUnresolvedParams, bool hasUnresolvedParams,
@ -88,8 +84,6 @@ static int AssignRTEIdentities(List *rangeTableList, int rteIdCounter);
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier); static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
static void AdjustPartitioningForDistributedPlanning(List *rangeTableList, static void AdjustPartitioningForDistributedPlanning(List *rangeTableList,
bool setPartitionedTablesInherited); bool setPartitionedTablesInherited);
static PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
DistributedPlan *distributedPlan);
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
DistributedPlan *distributedPlan, DistributedPlan *distributedPlan,
CustomScan *customScan); CustomScan *customScan);
@ -117,6 +111,10 @@ static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boun
static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
static bool QueryIsNotSimpleSelect(Node *node); static bool QueryIsNotSimpleSelect(Node *node);
static bool UpdateReferenceTablesWithShard(Node *node, void *context); static bool UpdateReferenceTablesWithShard(Node *node, void *context);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Const *distributionKeyValue);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
List *rangeTableList, int rteIdCounter);
/* Distributed planner hook */ /* Distributed planner hook */
PlannedStmt * PlannedStmt *
@ -124,10 +122,16 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
{ {
PlannedStmt *result = NULL; PlannedStmt *result = NULL;
bool needsDistributedPlanning = false; bool needsDistributedPlanning = false;
Query *originalQuery = NULL;
bool setPartitionedTablesInherited = false; bool setPartitionedTablesInherited = false;
List *rangeTableList = ExtractRangeTableEntryList(parse); List *rangeTableList = ExtractRangeTableEntryList(parse);
int rteIdCounter = 1; int rteIdCounter = 1;
bool fastPathRouterQuery = false;
Const *distributionKeyValue = NULL;
DistributedPlanningContext planContext = {
.query = parse,
.cursorOptions = cursorOptions,
.boundParams = boundParams,
};
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
{ {
@ -151,10 +155,24 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
else else
{ {
needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList); needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList);
if (needsDistributedPlanning)
{
fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue);
}
} }
} }
if (needsDistributedPlanning) if (fastPathRouterQuery)
{
/*
* We need to copy the parse tree because the FastPathPlanner modifies
* it. In the next branch we do the same for other distributed queries
* too, but for those it needs to be done AFTER calling
* AssignRTEIdentities.
*/
planContext.originalQuery = copyObject(parse);
}
else if (needsDistributedPlanning)
{ {
/* /*
* Inserting into a local table needs to go through the regular postgres * Inserting into a local table needs to go through the regular postgres
@ -173,13 +191,12 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
/* /*
* standard_planner scribbles on it's input, but for deparsing we need the * standard_planner scribbles on it's input, but for deparsing we need the
* unmodified form. Note that we keep RTE_RELATIONs with their identities * unmodified form. Note that before copying we call
* set, which doesn't break our goals, but, prevents us keeping an extra copy * AssignRTEIdentities, which is needed because these identities need
* of the query tree. Note that we copy the query tree once we're sure it's a * to be present in the copied query too.
* distributed query.
*/ */
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
originalQuery = copyObject(parse); planContext.originalQuery = copyObject(parse);
setPartitionedTablesInherited = false; setPartitionedTablesInherited = false;
AdjustPartitioningForDistributedPlanning(rangeTableList, AdjustPartitioningForDistributedPlanning(rangeTableList,
@ -193,75 +210,42 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
ReplaceTableVisibleFunction((Node *) parse); ReplaceTableVisibleFunction((Node *) parse);
/* create a restriction context and put it at the end if context list */ /* create a restriction context and put it at the end if context list */
PlannerRestrictionContext *plannerRestrictionContext = planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
CreateAndPushPlannerRestrictionContext();
/*
* We keep track of how many times we've recursed into the planner, primarily
* to detect whether we are in a function call. We need to make sure that the
* PlannerLevel is decremented exactly once at the end of the next PG_TRY
* block, both in the happy case and when an error occurs.
*/
PlannerLevel++;
PG_TRY(); PG_TRY();
{ {
/* if (fastPathRouterQuery)
* We keep track of how many times we've recursed into the planner, primarily
* to detect whether we are in a function call. We need to make sure that the
* PlannerLevel is decremented exactly once at the end of this PG_TRY block,
* both in the happy case and when an error occurs.
*/
PlannerLevel++;
/*
* For trivial queries, we're skipping the standard_planner() in
* order to eliminate its overhead.
*
* Otherwise, call into standard planner. This is required because the Citus
* planner relies on both the restriction information per table and parse tree
* transformations made by postgres' planner.
*/
if (needsDistributedPlanning && FastPathRouterQuery(originalQuery))
{ {
result = FastPathPlanner(originalQuery, parse, boundParams); result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue);
} }
else else
{ {
result = standard_planner(parse, cursorOptions, boundParams); /*
* Call into standard_planner because the Citus planner relies on both the
* restriction information per table and parse tree transformations made by
* postgres' planner.
*/
planContext.plan = standard_planner(planContext.query,
planContext.cursorOptions,
planContext.boundParams);
if (needsDistributedPlanning) if (needsDistributedPlanning)
{ {
/* may've inlined new relation rtes */ result = PlanDistributedStmt(&planContext, rangeTableList, rteIdCounter);
rangeTableList = ExtractRangeTableEntryList(parse);
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
} }
} else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL)
if (needsDistributedPlanning)
{
uint64 planId = NextPlanId++;
result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse,
boundParams, plannerRestrictionContext);
setPartitionedTablesInherited = true;
AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited);
}
else
{
bool hasExternParam = false;
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse,
&hasExternParam);
if (delegatePlan != NULL)
{ {
result = FinalizePlan(result, delegatePlan); result = planContext.plan;
}
else if (hasExternParam)
{
/*
* As in CreateDistributedPlannedStmt, try dissuade planner when planning
* potentially failed due to unresolved prepared statement parameters.
*/
result->planTree->total_cost = FLT_MAX / 100000000;
} }
} }
PlannerLevel--;
} }
PG_CATCH(); PG_CATCH();
{ {
@ -273,6 +257,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
} }
PG_END_TRY(); PG_END_TRY();
PlannerLevel--;
/* remove the context from the context list */ /* remove the context from the context list */
PopPlannerRestrictionContext(); PopPlannerRestrictionContext();
@ -550,30 +535,91 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan)
} }
/*
* PlanFastPathDistributedStmt creates a distributed planned statement using
* the FastPathPlanner.
*/
static PlannedStmt *
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Const *distributionKeyValue)
{
planContext->plannerRestrictionContext->fastPathRestrictionContext->
fastPathRouterQuery = true;
planContext->plannerRestrictionContext->fastPathRestrictionContext->
distributionKeyValue = distributionKeyValue;
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
planContext->boundParams);
return CreateDistributedPlannedStmt(planContext);
}
/*
* PlanDistributedStmt creates a distributed planned statement using the PG
* planner.
*/
static PlannedStmt *
PlanDistributedStmt(DistributedPlanningContext *planContext,
List *rangeTableList,
int rteIdCounter)
{
/* may've inlined new relation rtes */
rangeTableList = ExtractRangeTableEntryList(planContext->query);
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
PlannedStmt *result = CreateDistributedPlannedStmt(planContext);
bool setPartitionedTablesInherited = true;
AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited);
return result;
}
/*
* DissuadePlannerFromUsingPlan try dissuade planner when planning a plan that
* potentially failed due to unresolved prepared statement parameters.
*/
void
DissuadePlannerFromUsingPlan(PlannedStmt *plan)
{
/*
* Arbitrarily high cost, but low enough that it can be added up
* without overflowing by choose_custom_plan().
*/
plan->planTree->total_cost = FLT_MAX / 100000000;
}
/* /*
* CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular * CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular
* query into a distributed plan that is encapsulated by a PlannedStmt. * query into a distributed plan that is encapsulated by a PlannedStmt.
*/ */
static PlannedStmt * static PlannedStmt *
CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
Query *query, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext)
{ {
uint64 planId = NextPlanId++;
bool hasUnresolvedParams = false; bool hasUnresolvedParams = false;
JoinRestrictionContext *joinRestrictionContext = JoinRestrictionContext *joinRestrictionContext =
plannerRestrictionContext->joinRestrictionContext; planContext->plannerRestrictionContext->joinRestrictionContext;
if (HasUnresolvedExternParamsWalker((Node *) originalQuery, boundParams)) if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery,
planContext->boundParams))
{ {
hasUnresolvedParams = true; hasUnresolvedParams = true;
} }
plannerRestrictionContext->joinRestrictionContext = planContext->plannerRestrictionContext->joinRestrictionContext =
RemoveDuplicateJoinRestrictions(joinRestrictionContext); RemoveDuplicateJoinRestrictions(joinRestrictionContext);
DistributedPlan *distributedPlan = DistributedPlan *distributedPlan =
CreateDistributedPlan(planId, originalQuery, query, boundParams, CreateDistributedPlan(planId, planContext->originalQuery, planContext->query,
hasUnresolvedParams, plannerRestrictionContext); planContext->boundParams,
hasUnresolvedParams,
planContext->plannerRestrictionContext);
/* /*
* If no plan was generated, prepare a generic error to be emitted. * If no plan was generated, prepare a generic error to be emitted.
@ -618,7 +664,7 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi
distributedPlan->planId = planId; distributedPlan->planId = planId;
/* create final plan by combining local plan with distributed plan */ /* create final plan by combining local plan with distributed plan */
PlannedStmt *resultPlan = FinalizePlan(localPlan, distributedPlan); PlannedStmt *resultPlan = FinalizePlan(planContext->plan, distributedPlan);
/* /*
* As explained above, force planning costs to be unrealistically high if * As explained above, force planning costs to be unrealistically high if
@ -626,14 +672,11 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi
* if it is planned as a multi shard modify query. * if it is planned as a multi shard modify query.
*/ */
if ((distributedPlan->planningError || if ((distributedPlan->planningError ||
(IsUpdateOrDelete(originalQuery) && IsMultiTaskPlan(distributedPlan))) && (IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan(
distributedPlan))) &&
hasUnresolvedParams) hasUnresolvedParams)
{ {
/* DissuadePlannerFromUsingPlan(resultPlan);
* Arbitrarily high cost, but low enough that it can be added up
* without overflowing by choose_custom_plan().
*/
resultPlan->planTree->total_cost = FLT_MAX / 100000000;
} }
return resultPlan; return resultPlan;
@ -657,7 +700,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
DistributedPlan *distributedPlan = NULL; DistributedPlan *distributedPlan = NULL;
bool hasCtes = originalQuery->cteList != NIL; bool hasCtes = originalQuery->cteList != NIL;
if (IsModifyCommand(originalQuery)) if (IsModifyCommand(originalQuery))
{ {
EnsureModificationsCanRun(); EnsureModificationsCanRun();
@ -866,7 +908,14 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
static void static void
FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery) FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery)
{ {
RecordSubPlansUsedInPlan(plan, originalQuery); /*
* Fast path queries, we cannot have any subplans by their definition,
* so skip expensive traversals.
*/
if (!plan->fastPathRouterPlan)
{
RecordSubPlansUsedInPlan(plan, originalQuery);
}
} }
@ -1065,7 +1114,7 @@ GetDistributedPlan(CustomScan *customScan)
* FinalizePlan combines local plan with distributed plan and creates a plan * FinalizePlan combines local plan with distributed plan and creates a plan
* which can be run by the PostgreSQL executor. * which can be run by the PostgreSQL executor.
*/ */
static PlannedStmt * PlannedStmt *
FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
{ {
PlannedStmt *finalPlan = NULL; PlannedStmt *finalPlan = NULL;
@ -1870,6 +1919,9 @@ CreateAndPushPlannerRestrictionContext(void)
plannerRestrictionContext->joinRestrictionContext = plannerRestrictionContext->joinRestrictionContext =
palloc0(sizeof(JoinRestrictionContext)); palloc0(sizeof(JoinRestrictionContext));
plannerRestrictionContext->fastPathRestrictionContext =
palloc0(sizeof(FastPathRestrictionContext));
plannerRestrictionContext->memoryContext = CurrentMemoryContext; plannerRestrictionContext->memoryContext = CurrentMemoryContext;
/* we'll apply logical AND as we add tables */ /* we'll apply logical AND as we add tables */
@ -1929,6 +1981,10 @@ ResetPlannerRestrictionContext(PlannerRestrictionContext *plannerRestrictionCont
plannerRestrictionContext->joinRestrictionContext = plannerRestrictionContext->joinRestrictionContext =
palloc0(sizeof(JoinRestrictionContext)); palloc0(sizeof(JoinRestrictionContext));
plannerRestrictionContext->fastPathRestrictionContext =
palloc0(sizeof(FastPathRestrictionContext));
/* we'll apply logical AND as we add tables */ /* we'll apply logical AND as we add tables */
plannerRestrictionContext->relationRestrictionContext->allReferenceTables = true; plannerRestrictionContext->relationRestrictionContext->allReferenceTables = true;
} }

View File

@ -58,8 +58,10 @@
bool EnableFastPathRouterPlanner = true; bool EnableFastPathRouterPlanner = true;
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
static bool ConjunctionContainsColumnFilter(Node *node, Var *column); static bool ConjunctionContainsColumnFilter(Node *node, Var *column,
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn); Const **distributionKeyValue);
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
Const **distributionKeyValue);
/* /*
@ -122,7 +124,9 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
SeqScan *seqScanNode = makeNode(SeqScan); SeqScan *seqScanNode = makeNode(SeqScan);
Plan *plan = &seqScanNode->plan; Plan *plan = &seqScanNode->plan;
AssertArg(FastPathRouterQuery(parse)); Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
AssertArg(FastPathRouterQuery(parse, &distKey));
/* there is only a single relation rte */ /* there is only a single relation rte */
seqScanNode->scanrelid = 1; seqScanNode->scanrelid = 1;
@ -162,11 +166,12 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
* and it should be ANDed with any other filters. Also, the distribution * and it should be ANDed with any other filters. Also, the distribution
* key should only exists once in the WHERE clause. So basically, * key should only exists once in the WHERE clause. So basically,
* SELECT ... FROM dist_table WHERE dist_key = X * SELECT ... FROM dist_table WHERE dist_key = X
* If the filter is a const, distributionKeyValue is set
* - All INSERT statements (including multi-row INSERTs) as long as the commands * - All INSERT statements (including multi-row INSERTs) as long as the commands
* don't have any sublinks/CTEs etc * don't have any sublinks/CTEs etc
*/ */
bool bool
FastPathRouterQuery(Query *query) FastPathRouterQuery(Query *query, Const **distributionKeyValue)
{ {
FromExpr *joinTree = query->jointree; FromExpr *joinTree = query->jointree;
Node *quals = NULL; Node *quals = NULL;
@ -254,7 +259,7 @@ FastPathRouterQuery(Query *query)
* This is to simplify both of the individual checks and omit various edge cases * This is to simplify both of the individual checks and omit various edge cases
* that might arise with multiple distribution keys in the quals. * that might arise with multiple distribution keys in the quals.
*/ */
if (ConjunctionContainsColumnFilter(quals, distributionKey) && if (ConjunctionContainsColumnFilter(quals, distributionKey, distributionKeyValue) &&
!ColumnAppearsMultipleTimes(quals, distributionKey)) !ColumnAppearsMultipleTimes(quals, distributionKey))
{ {
return true; return true;
@ -298,9 +303,11 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey)
* ConjunctionContainsColumnFilter returns true if the query contains an exact * ConjunctionContainsColumnFilter returns true if the query contains an exact
* match (equal) expression on the provided column. The function returns true only * match (equal) expression on the provided column. The function returns true only
* if the match expression has an AND relation with the rest of the expression tree. * if the match expression has an AND relation with the rest of the expression tree.
*
* If the conjuction contains column filter which is const, distributionKeyValue is set.
*/ */
static bool static bool
ConjunctionContainsColumnFilter(Node *node, Var *column) ConjunctionContainsColumnFilter(Node *node, Var *column, Const **distributionKeyValue)
{ {
if (node == NULL) if (node == NULL)
{ {
@ -311,7 +318,7 @@ ConjunctionContainsColumnFilter(Node *node, Var *column)
{ {
OpExpr *opExpr = (OpExpr *) node; OpExpr *opExpr = (OpExpr *) node;
bool distKeyInSimpleOpExpression = bool distKeyInSimpleOpExpression =
DistKeyInSimpleOpExpression((Expr *) opExpr, column); DistKeyInSimpleOpExpression((Expr *) opExpr, column, distributionKeyValue);
if (!distKeyInSimpleOpExpression) if (!distKeyInSimpleOpExpression)
{ {
@ -342,7 +349,8 @@ ConjunctionContainsColumnFilter(Node *node, Var *column)
{ {
Node *argumentNode = (Node *) lfirst(argumentCell); Node *argumentNode = (Node *) lfirst(argumentCell);
if (ConjunctionContainsColumnFilter(argumentNode, column)) if (ConjunctionContainsColumnFilter(argumentNode, column,
distributionKeyValue))
{ {
return true; return true;
} }
@ -357,9 +365,11 @@ ConjunctionContainsColumnFilter(Node *node, Var *column)
* DistKeyInSimpleOpExpression checks whether given expression is a simple operator * DistKeyInSimpleOpExpression checks whether given expression is a simple operator
* expression with either (dist_key = param) or (dist_key = const). Note that the * expression with either (dist_key = param) or (dist_key = const). Note that the
* operands could be in the reverse order as well. * operands could be in the reverse order as well.
*
* When a const is found, distributionKeyValue is set.
*/ */
static bool static bool
DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn) DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Const **distributionKeyValue)
{ {
Node *leftOperand = NULL; Node *leftOperand = NULL;
Node *rightOperand = NULL; Node *rightOperand = NULL;
@ -420,6 +430,14 @@ DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn)
/* at this point we should have the columnInExpr */ /* at this point we should have the columnInExpr */
Assert(columnInExpr); Assert(columnInExpr);
bool distColumnExists = equal(distColumn, columnInExpr);
if (distColumnExists && constantClause != NULL &&
distColumn->vartype == constantClause->consttype &&
*distributionKeyValue == NULL)
{
/* if the vartypes do not match, let shard pruning handle it later */
*distributionKeyValue = copyObject(constantClause);
}
return equal(distColumn, columnInExpr); return distColumnExists;
} }

View File

@ -95,8 +95,8 @@ contain_param_walker(Node *node, void *context)
* forms involving multiple function calls, FROM clauses, WHERE clauses, * forms involving multiple function calls, FROM clauses, WHERE clauses,
* ... Those complex forms are handled in the coordinator. * ... Those complex forms are handled in the coordinator.
*/ */
DistributedPlan * PlannedStmt *
TryToDelegateFunctionCall(Query *query, bool *hasExternParam) TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
{ {
List *targetList = NIL; List *targetList = NIL;
TargetEntry *targetEntry = NULL; TargetEntry *targetEntry = NULL;
@ -114,12 +114,9 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
StringInfo queryString = NULL; StringInfo queryString = NULL;
Task *task = NULL; Task *task = NULL;
Job *job = NULL; Job *job = NULL;
DistributedPlan *distributedPlan = NULL; DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
struct ParamWalkerContext walkerParamContext = { 0 }; struct ParamWalkerContext walkerParamContext = { 0 };
/* set hasExternParam now in case of early exit */
*hasExternParam = false;
if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4)) if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4))
{ {
/* Citus is not ready to determine whether function is distributed */ /* Citus is not ready to determine whether function is distributed */
@ -133,19 +130,19 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
return NULL; return NULL;
} }
if (query == NULL) if (planContext->query == NULL)
{ {
/* no query (mostly here to be defensive) */ /* no query (mostly here to be defensive) */
return NULL; return NULL;
} }
if (query->commandType != CMD_SELECT) if (planContext->query->commandType != CMD_SELECT)
{ {
/* not a SELECT */ /* not a SELECT */
return NULL; return NULL;
} }
FromExpr *joinTree = query->jointree; FromExpr *joinTree = planContext->query->jointree;
if (joinTree == NULL) if (joinTree == NULL)
{ {
/* no join tree (mostly here to be defensive) */ /* no join tree (mostly here to be defensive) */
@ -174,7 +171,8 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
if (IsA(reference, RangeTblRef)) if (IsA(reference, RangeTblRef))
{ {
RangeTblEntry *rtentry = rt_fetch(reference->rtindex, query->rtable); RangeTblEntry *rtentry = rt_fetch(reference->rtindex,
planContext->query->rtable);
if (rtentry->rtekind != RTE_RESULT) if (rtentry->rtekind != RTE_RESULT)
{ {
/* e.g. SELECT f() FROM rel */ /* e.g. SELECT f() FROM rel */
@ -203,8 +201,8 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
#endif #endif
} }
targetList = query->targetList; targetList = planContext->query->targetList;
if (list_length(query->targetList) != 1) if (list_length(planContext->query->targetList) != 1)
{ {
/* multiple target list items */ /* multiple target list items */
return NULL; return NULL;
@ -288,7 +286,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
if (partitionParam->paramkind == PARAM_EXTERN) if (partitionParam->paramkind == PARAM_EXTERN)
{ {
/* Don't log a message, we should end up here again without a parameter */ /* Don't log a message, we should end up here again without a parameter */
*hasExternParam = true; DissuadePlannerFromUsingPlan(planContext->plan);
return NULL; return NULL;
} }
} }
@ -354,7 +352,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
if (walkerParamContext.paramKind == PARAM_EXTERN) if (walkerParamContext.paramKind == PARAM_EXTERN)
{ {
/* Don't log a message, we should end up here again without a parameter */ /* Don't log a message, we should end up here again without a parameter */
*hasExternParam = true; DissuadePlannerFromUsingPlan(planContext->plan);
} }
else else
{ {
@ -367,7 +365,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
ereport(DEBUG1, (errmsg("pushing down the function call"))); ereport(DEBUG1, (errmsg("pushing down the function call")));
queryString = makeStringInfo(); queryString = makeStringInfo();
pg_get_query_def(query, queryString); pg_get_query_def(planContext->query, queryString);
task = CitusMakeNode(Task); task = CitusMakeNode(Task);
task->taskType = SELECT_TASK; task->taskType = SELECT_TASK;
@ -378,7 +376,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
job = CitusMakeNode(Job); job = CitusMakeNode(Job);
job->jobId = UniqueJobId(); job->jobId = UniqueJobId();
job->jobQuery = query; job->jobQuery = planContext->query;
job->taskList = list_make1(task); job->taskList = list_make1(task);
distributedPlan = CitusMakeNode(DistributedPlan); distributedPlan = CitusMakeNode(DistributedPlan);
@ -390,5 +388,5 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
/* worker will take care of any necessary locking, treat query as read-only */ /* worker will take care of any necessary locking, treat query as read-only */
distributedPlan->modLevel = ROW_MODIFY_READONLY; distributedPlan->modLevel = ROW_MODIFY_READONLY;
return distributedPlan; return FinalizePlan(planContext->plan, distributedPlan);
} }

View File

@ -441,6 +441,9 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
plannerRestrictionContext->relationRestrictionContext); plannerRestrictionContext->relationRestrictionContext);
copyOfPlannerRestrictionContext->joinRestrictionContext = copyOfPlannerRestrictionContext->joinRestrictionContext =
plannerRestrictionContext->joinRestrictionContext; plannerRestrictionContext->joinRestrictionContext;
copyOfPlannerRestrictionContext->fastPathRestrictionContext =
plannerRestrictionContext->fastPathRestrictionContext;
relationRestrictionList = relationRestrictionList =
copyOfPlannerRestrictionContext->relationRestrictionContext-> copyOfPlannerRestrictionContext->relationRestrictionContext->
relationRestrictionList; relationRestrictionList;

View File

@ -156,7 +156,8 @@ static int CompareInsertValuesByShardId(const void *leftElement,
static uint64 GetAnchorShardId(List *relationShardList); static uint64 GetAnchorShardId(List *relationShardList);
static List * TargetShardIntervalForFastPathQuery(Query *query, static List * TargetShardIntervalForFastPathQuery(Query *query,
Const **partitionValueConst, Const **partitionValueConst,
bool *isMultiShardQuery); bool *isMultiShardQuery,
Const *distributionKeyValue);
static List * SingleShardSelectTaskList(Query *query, uint64 jobId, static List * SingleShardSelectTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList, List *relationShardList, List *placementList,
uint64 shardId); uint64 shardId);
@ -189,6 +190,9 @@ CreateRouterPlan(Query *originalQuery, Query *query,
plannerRestrictionContext); plannerRestrictionContext);
} }
distributedPlan->fastPathRouterPlan =
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
return distributedPlan; return distributedPlan;
} }
@ -552,6 +556,8 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
ListCell *rangeTableCell = NULL; ListCell *rangeTableCell = NULL;
uint32 queryTableCount = 0; uint32 queryTableCount = 0;
CmdType commandType = queryTree->commandType; CmdType commandType = queryTree->commandType;
bool fastPathRouterQuery =
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
Oid distributedTableId = ModifyQueryResultRelationId(queryTree); Oid distributedTableId = ModifyQueryResultRelationId(queryTree);
if (!IsDistributedTable(distributedTableId)) if (!IsDistributedTable(distributedTableId))
@ -575,8 +581,12 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
* rows based on the ctid column. This is a bad idea because ctid of * rows based on the ctid column. This is a bad idea because ctid of
* the rows could be changed before the modification part of * the rows could be changed before the modification part of
* the query is executed. * the query is executed.
*
* We can exclude fast path queries since they cannot have intermediate
* results by definition.
*/ */
if (ContainsReadIntermediateResultFunction((Node *) originalQuery)) if (!fastPathRouterQuery &&
ContainsReadIntermediateResultFunction((Node *) originalQuery))
{ {
bool hasTidColumn = FindNodeCheck((Node *) originalQuery->jointree, IsTidColumn); bool hasTidColumn = FindNodeCheck((Node *) originalQuery->jointree, IsTidColumn);
if (hasTidColumn) if (hasTidColumn)
@ -649,8 +659,15 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
} }
} }
/* extract range table entries */ /*
ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList); * Extract range table entries for queries that are not fast path. We can skip fast
* path queries because their definition is a single RTE entry, which is a relation,
* so the following check doesn't apply for fast-path queries.
*/
if (!fastPathRouterQuery)
{
ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList);
}
foreach(rangeTableCell, rangeTableList) foreach(rangeTableCell, rangeTableList)
{ {
@ -1617,8 +1634,9 @@ ExtractFirstDistributedTableId(Query *query)
List *rangeTableList = query->rtable; List *rangeTableList = query->rtable;
ListCell *rangeTableCell = NULL; ListCell *rangeTableCell = NULL;
Oid distributedTableId = InvalidOid; Oid distributedTableId = InvalidOid;
Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
Assert(IsModifyCommand(query) || FastPathRouterQuery(query)); Assert(IsModifyCommand(query) || FastPathRouterQuery(query, &distKey));
foreach(rangeTableCell, rangeTableList) foreach(rangeTableCell, rangeTableList)
{ {
@ -2014,6 +2032,8 @@ PlanRouterQuery(Query *originalQuery,
bool shardsPresent = false; bool shardsPresent = false;
uint64 shardId = INVALID_SHARD_ID; uint64 shardId = INVALID_SHARD_ID;
CmdType commandType = originalQuery->commandType; CmdType commandType = originalQuery->commandType;
bool fastPathRouterQuery =
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
*placementList = NIL; *placementList = NIL;
@ -2022,11 +2042,15 @@ PlanRouterQuery(Query *originalQuery,
* not been called. Thus, restriction information is not avaliable and we do the * not been called. Thus, restriction information is not avaliable and we do the
* shard pruning based on the distribution column in the quals of the query. * shard pruning based on the distribution column in the quals of the query.
*/ */
if (FastPathRouterQuery(originalQuery)) if (fastPathRouterQuery)
{ {
Const *distributionKeyValue =
plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue;
List *shardIntervalList = List *shardIntervalList =
TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst, TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst,
&isMultiShardQuery); &isMultiShardQuery, distributionKeyValue);
/* /*
* This could only happen when there is a parameter on the distribution key. * This could only happen when there is a parameter on the distribution key.
@ -2247,21 +2271,34 @@ GetAnchorShardId(List *prunedShardIntervalListList)
*/ */
static List * static List *
TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst,
bool *isMultiShardQuery) bool *isMultiShardQuery, Const *distributionKeyValue)
{ {
Const *queryPartitionValueConst = NULL;
Oid relationId = ExtractFirstDistributedTableId(query); Oid relationId = ExtractFirstDistributedTableId(query);
if (distributionKeyValue)
{
DistTableCacheEntry *cache = DistributedTableCacheEntry(relationId);
ShardInterval *shardInterval =
FindShardInterval(distributionKeyValue->constvalue, cache);
if (partitionValueConst != NULL)
{
/* set the outgoing partition column value if requested */
*partitionValueConst = distributionKeyValue;
}
return list_make1(shardInterval);
}
Node *quals = query->jointree->quals; Node *quals = query->jointree->quals;
int relationIndex = 1; int relationIndex = 1;
Const *queryPartitionValueConst = NULL;
List *prunedShardIntervalList = List *prunedShardIntervalList =
PruneShards(relationId, relationIndex, make_ands_implicit((Expr *) quals), PruneShards(relationId, relationIndex, make_ands_implicit((Expr *) quals),
&queryPartitionValueConst); &queryPartitionValueConst);
/* we're only expecting single shard from a single table */ /* we're only expecting single shard from a single table */
Assert(FastPathRouterQuery(query)); Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
Assert(FastPathRouterQuery(query, &distKey));
if (list_length(prunedShardIntervalList) > 1) if (list_length(prunedShardIntervalList) > 1)
{ {

View File

@ -195,6 +195,7 @@ OutDistributedPlan(OUTFUNC_ARGS)
WRITE_NODE_FIELD(subPlanList); WRITE_NODE_FIELD(subPlanList);
WRITE_NODE_FIELD(usedSubPlanNodeList); WRITE_NODE_FIELD(usedSubPlanNodeList);
WRITE_BOOL_FIELD(fastPathRouterPlan);
WRITE_NODE_FIELD(planningError); WRITE_NODE_FIELD(planningError);
} }

View File

@ -223,6 +223,7 @@ ReadDistributedPlan(READFUNC_ARGS)
READ_NODE_FIELD(subPlanList); READ_NODE_FIELD(subPlanList);
READ_NODE_FIELD(usedSubPlanNodeList); READ_NODE_FIELD(usedSubPlanNodeList);
READ_BOOL_FIELD(fastPathRouterPlan);
READ_NODE_FIELD(planningError); READ_NODE_FIELD(planningError);

View File

@ -85,10 +85,31 @@ typedef struct JoinRestriction
RelOptInfo *outerrel; RelOptInfo *outerrel;
} JoinRestriction; } JoinRestriction;
typedef struct FastPathRestrictionContext
{
bool fastPathRouterQuery;
/*
* While calculating fastPathRouterQuery, we could sometimes be
* able to extract the distribution key value as well (such as when
* there are no prepared statements). Could be NULL when the distribution
* key contains parameter, so check for it before using.
*/
Const *distributionKeyValue;
}FastPathRestrictionContext;
typedef struct PlannerRestrictionContext typedef struct PlannerRestrictionContext
{ {
RelationRestrictionContext *relationRestrictionContext; RelationRestrictionContext *relationRestrictionContext;
JoinRestrictionContext *joinRestrictionContext; JoinRestrictionContext *joinRestrictionContext;
/*
* When the query is qualified for fast path, we don't have
* the RelationRestrictionContext and JoinRestrictionContext
* since those are dependent to calling standard_planner.
* Instead, we keep this struct to pass some extra information.
*/
FastPathRestrictionContext *fastPathRestrictionContext;
bool hasSemiJoin; bool hasSemiJoin;
MemoryContext memoryContext; MemoryContext memoryContext;
} PlannerRestrictionContext; } PlannerRestrictionContext;
@ -108,6 +129,33 @@ typedef struct RelationRowLock
} RelationRowLock; } RelationRowLock;
typedef struct DistributedPlanningContext
{
/* The parsed query that is given to the planner. It is a slightly modified
* to work with the standard_planner */
Query *query;
/* A copy of the original parsed query that is given to the planner. This
* doesn't contain most of the changes that are made to parse. There's one
* that change that is made for non fast path router queries though, which
* is the assigning of RTE identities using AssignRTEIdentities. This is
* NULL for non distributed plans, since those don't need it. */
Query *originalQuery;
/* the cursor options given to the planner */
int cursorOptions;
/* the ParamListInfo that is given to the planner */
ParamListInfo boundParams;
/* Plan created either by standard_planner or by FastPathPlanner */
PlannedStmt *plan;
/* Our custom restriction context */
PlannerRestrictionContext *plannerRestrictionContext;
} DistributedPlanningContext;
extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions,
ParamListInfo boundParams); ParamListInfo boundParams);
extern List * ExtractRangeTableEntryList(Query *query); extern List * ExtractRangeTableEntryList(Query *query);
@ -129,5 +177,8 @@ extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
extern int GetRTEIdentity(RangeTblEntry *rte); extern int GetRTEIdentity(RangeTblEntry *rte);
extern int32 BlessRecordExpression(Expr *expr); extern int32 BlessRecordExpression(Expr *expr);
extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan);
extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
struct DistributedPlan *distributedPlan);
#endif /* DISTRIBUTED_PLANNER_H */ #endif /* DISTRIBUTED_PLANNER_H */

View File

@ -10,10 +10,12 @@
#define FUNCTION_CALL_DELEGATION_H #define FUNCTION_CALL_DELEGATION_H
#include "postgres.h" #include "postgres.h"
#include "distributed/distributed_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
DistributedPlan * TryToDelegateFunctionCall(Query *query, bool *hasParam); PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext);
#endif /* FUNCTION_CALL_DELEGATION_H */ #endif /* FUNCTION_CALL_DELEGATION_H */

View File

@ -310,6 +310,12 @@ typedef struct DistributedPlan
*/ */
List *usedSubPlanNodeList; List *usedSubPlanNodeList;
/*
* When the query is very simple such that we don't need to call
* standard_planner(). See FastPathRouterQuery() for the definition.
*/
bool fastPathRouterPlan;
/* /*
* NULL if this a valid plan, an error description otherwise. This will * NULL if this a valid plan, an error description otherwise. This will
* e.g. be set if SQL features are present that a planner doesn't support, * e.g. be set if SQL features are present that a planner doesn't support,

View File

@ -78,6 +78,6 @@ extern List * WorkersContainingAllShards(List *prunedShardIntervalsList);
extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo
boundParams); boundParams);
extern bool FastPathRouterQuery(Query *query); extern bool FastPathRouterQuery(Query *query, Const **distributionKeyValue);
#endif /* MULTI_ROUTER_PLANNER_H */ #endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -61,6 +61,7 @@ DELETE FROM modify_fast_path WHERE key = 1 and FALSE;
DEBUG: Distributed planning for a fast-path router query DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
-- UPDATE may include complex target entries -- UPDATE may include complex target entries
UPDATE modify_fast_path SET value_1 = value_1 + 12 * value_1 WHERE key = 1; UPDATE modify_fast_path SET value_1 = value_1 + 12 * value_1 WHERE key = 1;
DEBUG: Distributed planning for a fast-path router query DEBUG: Distributed planning for a fast-path router query