From ca293116fa6521e383147abd07d035fdc49f110e Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 19 Dec 2019 10:59:42 +0100 Subject: [PATCH 1/5] Reduce calls to FastPathRouterQuery() Before this commit, we called it twice durning planning. Instead, we save the information and pass it. --- .../distributed/planner/distributed_planner.c | 17 ++++++++++++++++- .../distributed/planner/insert_select_planner.c | 3 +++ .../distributed/planner/multi_router_planner.c | 4 +++- src/include/distributed/distributed_planner.h | 13 +++++++++++++ 4 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 07641f272..2735d3562 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -128,6 +128,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) bool setPartitionedTablesInherited = false; List *rangeTableList = ExtractRangeTableEntryList(parse); int rteIdCounter = 1; + bool fastPathRouterQuery = false; if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) { @@ -151,6 +152,10 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) else { needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList); + if (needsDistributedPlanning) + { + fastPathRouterQuery = FastPathRouterQuery(parse); + } } } @@ -215,8 +220,11 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * transformations made by postgres' planner. */ - if (needsDistributedPlanning && FastPathRouterQuery(originalQuery)) + if (needsDistributedPlanning && fastPathRouterQuery) { + plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery = + true; + result = FastPathPlanner(originalQuery, parse, boundParams); } else @@ -1870,6 +1878,9 @@ CreateAndPushPlannerRestrictionContext(void) plannerRestrictionContext->joinRestrictionContext = palloc0(sizeof(JoinRestrictionContext)); + plannerRestrictionContext->fastPathRestrictionContext = + palloc0(sizeof(FastPathRestrictionContext)); + plannerRestrictionContext->memoryContext = CurrentMemoryContext; /* we'll apply logical AND as we add tables */ @@ -1929,6 +1940,10 @@ ResetPlannerRestrictionContext(PlannerRestrictionContext *plannerRestrictionCont plannerRestrictionContext->joinRestrictionContext = palloc0(sizeof(JoinRestrictionContext)); + plannerRestrictionContext->fastPathRestrictionContext = + palloc0(sizeof(FastPathRestrictionContext)); + + /* we'll apply logical AND as we add tables */ plannerRestrictionContext->relationRestrictionContext->allReferenceTables = true; } diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index b216a42d6..03ebd0e59 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -441,6 +441,9 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter plannerRestrictionContext->relationRestrictionContext); copyOfPlannerRestrictionContext->joinRestrictionContext = plannerRestrictionContext->joinRestrictionContext; + copyOfPlannerRestrictionContext->fastPathRestrictionContext = + plannerRestrictionContext->fastPathRestrictionContext; + relationRestrictionList = copyOfPlannerRestrictionContext->relationRestrictionContext-> relationRestrictionList; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 1a2694a53..fe534c053 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2014,6 +2014,8 @@ PlanRouterQuery(Query *originalQuery, bool shardsPresent = false; uint64 shardId = INVALID_SHARD_ID; CmdType commandType = originalQuery->commandType; + bool fastPathRouterQuery = + plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; *placementList = NIL; @@ -2022,7 +2024,7 @@ PlanRouterQuery(Query *originalQuery, * not been called. Thus, restriction information is not avaliable and we do the * shard pruning based on the distribution column in the quals of the query. */ - if (FastPathRouterQuery(originalQuery)) + if (fastPathRouterQuery) { List *shardIntervalList = TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst, diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index d620e47e5..880781ec0 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -85,10 +85,23 @@ typedef struct JoinRestriction RelOptInfo *outerrel; } JoinRestriction; +typedef struct FastPathRestrictionContext +{ + bool fastPathRouterQuery; +}FastPathRestrictionContext; + typedef struct PlannerRestrictionContext { RelationRestrictionContext *relationRestrictionContext; JoinRestrictionContext *joinRestrictionContext; + + /* + * When the query is qualified for fast path, we don't have + * the RelationRestrictionContext and JoinRestrictionContext + * since those are dependent to calling standard_planner. + * Instead, we keep this struct to pass some extra information. + */ + FastPathRestrictionContext *fastPathRestrictionContext; bool hasSemiJoin; MemoryContext memoryContext; } PlannerRestrictionContext; From 7f3ab7892dd59e8dab9c763c5c6fd74ee3678c02 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 19 Dec 2019 11:20:59 +0100 Subject: [PATCH 2/5] Skip shard pruning when possible We're already traversing the queryTree and finding the distribution key value, so pass it to the later stages of the planning. --- .../distributed/planner/distributed_planner.c | 5 ++- .../planner/fast_path_router_planner.c | 38 ++++++++++++++----- .../planner/multi_router_planner.c | 35 ++++++++++++++--- src/include/distributed/distributed_planner.h | 8 ++++ .../distributed/multi_router_planner.h | 2 +- .../expected/fast_path_router_modify.out | 1 + 6 files changed, 72 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 2735d3562..ce45416e5 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -129,6 +129,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) List *rangeTableList = ExtractRangeTableEntryList(parse); int rteIdCounter = 1; bool fastPathRouterQuery = false; + Const *distributionKeyValue = NULL; if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) { @@ -154,7 +155,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList); if (needsDistributedPlanning) { - fastPathRouterQuery = FastPathRouterQuery(parse); + fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue); } } } @@ -224,6 +225,8 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery = true; + plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue = + distributionKeyValue; result = FastPathPlanner(originalQuery, parse, boundParams); } diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 3e0b3f20a..8dc6f5225 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -58,8 +58,10 @@ bool EnableFastPathRouterPlanner = true; static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); -static bool ConjunctionContainsColumnFilter(Node *node, Var *column); -static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn); +static bool ConjunctionContainsColumnFilter(Node *node, Var *column, + Const **distributionKeyValue); +static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, + Const **distributionKeyValue); /* @@ -122,7 +124,9 @@ GeneratePlaceHolderPlannedStmt(Query *parse) SeqScan *seqScanNode = makeNode(SeqScan); Plan *plan = &seqScanNode->plan; - AssertArg(FastPathRouterQuery(parse)); + Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; + + AssertArg(FastPathRouterQuery(parse, &distKey)); /* there is only a single relation rte */ seqScanNode->scanrelid = 1; @@ -162,11 +166,12 @@ GeneratePlaceHolderPlannedStmt(Query *parse) * and it should be ANDed with any other filters. Also, the distribution * key should only exists once in the WHERE clause. So basically, * SELECT ... FROM dist_table WHERE dist_key = X + * If the filter is a const, distributionKeyValue is set * - All INSERT statements (including multi-row INSERTs) as long as the commands * don't have any sublinks/CTEs etc */ bool -FastPathRouterQuery(Query *query) +FastPathRouterQuery(Query *query, Const **distributionKeyValue) { FromExpr *joinTree = query->jointree; Node *quals = NULL; @@ -254,7 +259,7 @@ FastPathRouterQuery(Query *query) * This is to simplify both of the individual checks and omit various edge cases * that might arise with multiple distribution keys in the quals. */ - if (ConjunctionContainsColumnFilter(quals, distributionKey) && + if (ConjunctionContainsColumnFilter(quals, distributionKey, distributionKeyValue) && !ColumnAppearsMultipleTimes(quals, distributionKey)) { return true; @@ -298,9 +303,11 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey) * ConjunctionContainsColumnFilter returns true if the query contains an exact * match (equal) expression on the provided column. The function returns true only * if the match expression has an AND relation with the rest of the expression tree. + * + * If the conjuction contains column filter which is const, distributionKeyValue is set. */ static bool -ConjunctionContainsColumnFilter(Node *node, Var *column) +ConjunctionContainsColumnFilter(Node *node, Var *column, Const **distributionKeyValue) { if (node == NULL) { @@ -311,7 +318,7 @@ ConjunctionContainsColumnFilter(Node *node, Var *column) { OpExpr *opExpr = (OpExpr *) node; bool distKeyInSimpleOpExpression = - DistKeyInSimpleOpExpression((Expr *) opExpr, column); + DistKeyInSimpleOpExpression((Expr *) opExpr, column, distributionKeyValue); if (!distKeyInSimpleOpExpression) { @@ -342,7 +349,8 @@ ConjunctionContainsColumnFilter(Node *node, Var *column) { Node *argumentNode = (Node *) lfirst(argumentCell); - if (ConjunctionContainsColumnFilter(argumentNode, column)) + if (ConjunctionContainsColumnFilter(argumentNode, column, + distributionKeyValue)) { return true; } @@ -357,9 +365,11 @@ ConjunctionContainsColumnFilter(Node *node, Var *column) * DistKeyInSimpleOpExpression checks whether given expression is a simple operator * expression with either (dist_key = param) or (dist_key = const). Note that the * operands could be in the reverse order as well. + * + * When a const is found, distributionKeyValue is set. */ static bool -DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn) +DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Const **distributionKeyValue) { Node *leftOperand = NULL; Node *rightOperand = NULL; @@ -420,6 +430,14 @@ DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn) /* at this point we should have the columnInExpr */ Assert(columnInExpr); + bool distColumnExists = equal(distColumn, columnInExpr); + if (distColumnExists && constantClause != NULL && + distColumn->vartype == constantClause->consttype && + *distributionKeyValue == NULL) + { + /* if the vartypes do not match, let shard pruning handle it later */ + *distributionKeyValue = copyObject(constantClause); + } - return equal(distColumn, columnInExpr); + return distColumnExists; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index fe534c053..99d884a65 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1617,8 +1617,9 @@ ExtractFirstDistributedTableId(Query *query) List *rangeTableList = query->rtable; ListCell *rangeTableCell = NULL; Oid distributedTableId = InvalidOid; + Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; - Assert(IsModifyCommand(query) || FastPathRouterQuery(query)); + Assert(IsModifyCommand(query) || FastPathRouterQuery(query, &distKey)); foreach(rangeTableCell, rangeTableList) { @@ -2026,9 +2027,32 @@ PlanRouterQuery(Query *originalQuery, */ if (fastPathRouterQuery) { - List *shardIntervalList = - TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst, - &isMultiShardQuery); + List *shardIntervalList = NIL; + Const *distributionKeyValue = + plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue; + + if (distributionKeyValue) + { + Oid relationId = ExtractFirstDistributedTableId(originalQuery); + DistTableCacheEntry *cache = DistributedTableCacheEntry(relationId); + ShardInterval *shardInterval = + FindShardInterval(distributionKeyValue->constvalue, cache); + + shardIntervalList = list_make1(shardInterval); + + if (partitionValueConst != NULL) + { + /* set the outgoing partition column value if requested */ + *partitionValueConst = distributionKeyValue; + } + } + else + { + shardIntervalList = + TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst, + &isMultiShardQuery); + } + /* * This could only happen when there is a parameter on the distribution key. @@ -2263,7 +2287,8 @@ TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, &queryPartitionValueConst); /* we're only expecting single shard from a single table */ - Assert(FastPathRouterQuery(query)); + Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; + Assert(FastPathRouterQuery(query, &distKey)); if (list_length(prunedShardIntervalList) > 1) { diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 880781ec0..8513b7d60 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -88,6 +88,14 @@ typedef struct JoinRestriction typedef struct FastPathRestrictionContext { bool fastPathRouterQuery; + + /* + * While calculating fastPathRouterQuery, we could sometimes be + * able to extract the distribution key value as well (such as when + * there are no prepared statements). Could be NULL when the distribution + * key contains parameter, so check for it before using. + */ + Const *distributionKeyValue; }FastPathRestrictionContext; typedef struct PlannerRestrictionContext diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 68c071de7..650df0065 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -78,6 +78,6 @@ extern List * WorkersContainingAllShards(List *prunedShardIntervalsList); extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams); -extern bool FastPathRouterQuery(Query *query); +extern bool FastPathRouterQuery(Query *query, Const **distributionKeyValue); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/fast_path_router_modify.out b/src/test/regress/expected/fast_path_router_modify.out index c83c9dca0..96745a84b 100644 --- a/src/test/regress/expected/fast_path_router_modify.out +++ b/src/test/regress/expected/fast_path_router_modify.out @@ -61,6 +61,7 @@ DELETE FROM modify_fast_path WHERE key = 1 and FALSE; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable +DETAIL: distribution column value: 1 -- UPDATE may include complex target entries UPDATE modify_fast_path SET value_1 = value_1 + 12 * value_1 WHERE key = 1; DEBUG: Distributed planning for a fast-path router query From 13a9b55695fdb40d7b777bf12c59da00ac9a2fa1 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 19 Dec 2019 15:16:58 +0100 Subject: [PATCH 3/5] Skip expensive checks when fast-path query The definition of fast-path query is very strict. So, we don't need to do some extra checks. --- .../distributed/planner/distributed_planner.c | 63 +++++++++++++------ .../planner/multi_router_planner.c | 19 +++++- 2 files changed, 61 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index ce45416e5..ff9305010 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -79,7 +79,8 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue bool hasUnresolvedParams, PlannerRestrictionContext * plannerRestrictionContext); -static void FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery); +static void FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery, + bool fastPathRouterQuery); static void RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery); static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId); @@ -168,7 +169,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * don't have a way of doing both things and therefore error out, but do * have a handy tip for users. */ - if (InsertSelectIntoLocalTable(parse)) + if (!fastPathRouterQuery && InsertSelectIntoLocalTable(parse)) { ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a " "local table"), @@ -183,13 +184,27 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * set, which doesn't break our goals, but, prevents us keeping an extra copy * of the query tree. Note that we copy the query tree once we're sure it's a * distributed query. + * + * Since fast-path queries do not through standard planner, we skip unnecessary + * parts in that case. */ - rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); - originalQuery = copyObject(parse); + if (!fastPathRouterQuery) + { + rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); + originalQuery = copyObject(parse); - setPartitionedTablesInherited = false; - AdjustPartitioningForDistributedPlanning(rangeTableList, - setPartitionedTablesInherited); + setPartitionedTablesInherited = false; + AdjustPartitioningForDistributedPlanning(rangeTableList, + setPartitionedTablesInherited); + } + else + { + /* + * We still need to copy the parse tree because the FastPathPlanner + * modifies it. + */ + originalQuery = copyObject(parse); + } } /* @@ -249,9 +264,12 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse, boundParams, plannerRestrictionContext); - setPartitionedTablesInherited = true; - AdjustPartitioningForDistributedPlanning(rangeTableList, - setPartitionedTablesInherited); + if (!fastPathRouterQuery) + { + setPartitionedTablesInherited = true; + AdjustPartitioningForDistributedPlanning(rangeTableList, + setPartitionedTablesInherited); + } } else { @@ -667,7 +685,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi { DistributedPlan *distributedPlan = NULL; bool hasCtes = originalQuery->cteList != NIL; - + bool fastPathRouterQuery = + plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; if (IsModifyCommand(originalQuery)) { @@ -703,7 +722,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi if (distributedPlan->planningError == NULL) { - FinalizeDistributedPlan(distributedPlan, originalQuery); + FinalizeDistributedPlan(distributedPlan, originalQuery, fastPathRouterQuery); return distributedPlan; } @@ -725,7 +744,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi plannerRestrictionContext); if (distributedPlan->planningError == NULL) { - FinalizeDistributedPlan(distributedPlan, originalQuery); + FinalizeDistributedPlan(distributedPlan, originalQuery, fastPathRouterQuery); return distributedPlan; } @@ -819,7 +838,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi plannerRestrictionContext); distributedPlan->subPlanList = subPlanList; - FinalizeDistributedPlan(distributedPlan, originalQuery); + FinalizeDistributedPlan(distributedPlan, originalQuery, fastPathRouterQuery); return distributedPlan; } @@ -831,7 +850,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi */ if (IsModifyCommand(originalQuery)) { - FinalizeDistributedPlan(distributedPlan, originalQuery); + FinalizeDistributedPlan(distributedPlan, originalQuery, fastPathRouterQuery); return distributedPlan; } @@ -864,7 +883,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi /* distributed plan currently should always succeed or error out */ Assert(distributedPlan && distributedPlan->planningError == NULL); - FinalizeDistributedPlan(distributedPlan, originalQuery); + FinalizeDistributedPlan(distributedPlan, originalQuery, fastPathRouterQuery); return distributedPlan; } @@ -875,9 +894,17 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi * currently only implements some optimizations for intermediate result(s) pruning. */ static void -FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery) +FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery, + bool fastPathRouterQuery) { - RecordSubPlansUsedInPlan(plan, originalQuery); + /* + * Fast path queries, we cannot have any subplans by their definition, + * so skip expensive traversals. + */ + if (!fastPathRouterQuery) + { + RecordSubPlansUsedInPlan(plan, originalQuery); + } } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 99d884a65..26d4c2438 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -552,6 +552,8 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer ListCell *rangeTableCell = NULL; uint32 queryTableCount = 0; CmdType commandType = queryTree->commandType; + bool fastPathRouterQuery = + plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; Oid distributedTableId = ModifyQueryResultRelationId(queryTree); if (!IsDistributedTable(distributedTableId)) @@ -575,8 +577,12 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer * rows based on the ctid column. This is a bad idea because ctid of * the rows could be changed before the modification part of * the query is executed. + * + * We can exclude fast path queries since they cannot have intermediate + * results by definition. */ - if (ContainsReadIntermediateResultFunction((Node *) originalQuery)) + if (!fastPathRouterQuery && + ContainsReadIntermediateResultFunction((Node *) originalQuery)) { bool hasTidColumn = FindNodeCheck((Node *) originalQuery->jointree, IsTidColumn); if (hasTidColumn) @@ -649,8 +655,15 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer } } - /* extract range table entries */ - ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList); + /* + * Extract range table entries for queries that are not fast path. We can skip fast + * path queries because their definition is a single RTE entry, which is a relation, + * so the following check doesn't apply for fast-path queries. + */ + if (!fastPathRouterQuery) + { + ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList); + } foreach(rangeTableCell, rangeTableList) { From 5a1e7527260f79df7ff8a8492c42be18e08d7448 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 23 Dec 2019 18:44:29 +0100 Subject: [PATCH 4/5] Apply feedback - add fastPath field to plan --- .../distributed/planner/distributed_planner.c | 20 ++++++++----------- .../planner/multi_router_planner.c | 3 +++ .../distributed/utils/citus_outfuncs.c | 1 + .../distributed/utils/citus_readfuncs.c | 1 + .../distributed/multi_physical_planner.h | 6 ++++++ 5 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index ff9305010..7d3752e96 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -79,8 +79,7 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue bool hasUnresolvedParams, PlannerRestrictionContext * plannerRestrictionContext); -static void FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery, - bool fastPathRouterQuery); +static void FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery); static void RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery); static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId); @@ -685,8 +684,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi { DistributedPlan *distributedPlan = NULL; bool hasCtes = originalQuery->cteList != NIL; - bool fastPathRouterQuery = - plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; if (IsModifyCommand(originalQuery)) { @@ -722,7 +719,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi if (distributedPlan->planningError == NULL) { - FinalizeDistributedPlan(distributedPlan, originalQuery, fastPathRouterQuery); + FinalizeDistributedPlan(distributedPlan, originalQuery); return distributedPlan; } @@ -744,7 +741,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi plannerRestrictionContext); if (distributedPlan->planningError == NULL) { - FinalizeDistributedPlan(distributedPlan, originalQuery, fastPathRouterQuery); + FinalizeDistributedPlan(distributedPlan, originalQuery); return distributedPlan; } @@ -838,7 +835,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi plannerRestrictionContext); distributedPlan->subPlanList = subPlanList; - FinalizeDistributedPlan(distributedPlan, originalQuery, fastPathRouterQuery); + FinalizeDistributedPlan(distributedPlan, originalQuery); return distributedPlan; } @@ -850,7 +847,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi */ if (IsModifyCommand(originalQuery)) { - FinalizeDistributedPlan(distributedPlan, originalQuery, fastPathRouterQuery); + FinalizeDistributedPlan(distributedPlan, originalQuery); return distributedPlan; } @@ -883,7 +880,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi /* distributed plan currently should always succeed or error out */ Assert(distributedPlan && distributedPlan->planningError == NULL); - FinalizeDistributedPlan(distributedPlan, originalQuery, fastPathRouterQuery); + FinalizeDistributedPlan(distributedPlan, originalQuery); return distributedPlan; } @@ -894,14 +891,13 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi * currently only implements some optimizations for intermediate result(s) pruning. */ static void -FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery, - bool fastPathRouterQuery) +FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery) { /* * Fast path queries, we cannot have any subplans by their definition, * so skip expensive traversals. */ - if (!fastPathRouterQuery) + if (!plan->fastPathRouterPlan) { RecordSubPlansUsedInPlan(plan, originalQuery); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 26d4c2438..a31320d62 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -189,6 +189,9 @@ CreateRouterPlan(Query *originalQuery, Query *query, plannerRestrictionContext); } + distributedPlan->fastPathRouterPlan = + plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; + return distributedPlan; } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 6cf21078e..942852a78 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -195,6 +195,7 @@ OutDistributedPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(subPlanList); WRITE_NODE_FIELD(usedSubPlanNodeList); + WRITE_BOOL_FIELD(fastPathRouterPlan); WRITE_NODE_FIELD(planningError); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index f3ceef7ac..d5661427b 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -223,6 +223,7 @@ ReadDistributedPlan(READFUNC_ARGS) READ_NODE_FIELD(subPlanList); READ_NODE_FIELD(usedSubPlanNodeList); + READ_BOOL_FIELD(fastPathRouterPlan); READ_NODE_FIELD(planningError); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 9862121bb..344fbb315 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -310,6 +310,12 @@ typedef struct DistributedPlan */ List *usedSubPlanNodeList; + /* + * When the query is very simple such that we don't need to call + * standard_planner(). See FastPathRouterQuery() for the definition. + */ + bool fastPathRouterPlan; + /* * NULL if this a valid plan, an error description otherwise. This will * e.g. be set if SQL features are present that a planner doesn't support, From 5b0baea72cab9e129c9448a658f9327cadbe2f2d Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 27 Dec 2019 13:47:49 +0100 Subject: [PATCH 5/5] Refactor distributed_planner for better understandability --- .../distributed/planner/distributed_planner.c | 247 ++++++++++-------- .../planner/function_call_delegation.c | 32 ++- .../planner/multi_router_planner.c | 50 ++-- src/include/distributed/distributed_planner.h | 30 +++ .../distributed/function_call_delegation.h | 4 +- 5 files changed, 201 insertions(+), 162 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 7d3752e96..4aef0b89c 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -66,14 +66,10 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; - static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool IsUpdateOrDelete(Query *query); -static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, - Query *originalQuery, Query *query, - ParamListInfo boundParams, - PlannerRestrictionContext * - plannerRestrictionContext); +static PlannedStmt * CreateDistributedPlannedStmt( + DistributedPlanningContext *planContext); static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo boundParams, bool hasUnresolvedParams, @@ -88,8 +84,6 @@ static int AssignRTEIdentities(List *rangeTableList, int rteIdCounter); static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier); static void AdjustPartitioningForDistributedPlanning(List *rangeTableList, bool setPartitionedTablesInherited); -static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, - DistributedPlan *distributedPlan); static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan, CustomScan *customScan); @@ -117,6 +111,10 @@ static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boun static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); static bool QueryIsNotSimpleSelect(Node *node); static bool UpdateReferenceTablesWithShard(Node *node, void *context); +static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, + Const *distributionKeyValue); +static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, + List *rangeTableList, int rteIdCounter); /* Distributed planner hook */ PlannedStmt * @@ -124,12 +122,16 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { PlannedStmt *result = NULL; bool needsDistributedPlanning = false; - Query *originalQuery = NULL; bool setPartitionedTablesInherited = false; List *rangeTableList = ExtractRangeTableEntryList(parse); int rteIdCounter = 1; bool fastPathRouterQuery = false; Const *distributionKeyValue = NULL; + DistributedPlanningContext planContext = { + .query = parse, + .cursorOptions = cursorOptions, + .boundParams = boundParams, + }; if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) { @@ -160,7 +162,17 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } } - if (needsDistributedPlanning) + if (fastPathRouterQuery) + { + /* + * We need to copy the parse tree because the FastPathPlanner modifies + * it. In the next branch we do the same for other distributed queries + * too, but for those it needs to be done AFTER calling + * AssignRTEIdentities. + */ + planContext.originalQuery = copyObject(parse); + } + else if (needsDistributedPlanning) { /* * Inserting into a local table needs to go through the regular postgres @@ -168,7 +180,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * don't have a way of doing both things and therefore error out, but do * have a handy tip for users. */ - if (!fastPathRouterQuery && InsertSelectIntoLocalTable(parse)) + if (InsertSelectIntoLocalTable(parse)) { ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a " "local table"), @@ -179,31 +191,16 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) /* * standard_planner scribbles on it's input, but for deparsing we need the - * unmodified form. Note that we keep RTE_RELATIONs with their identities - * set, which doesn't break our goals, but, prevents us keeping an extra copy - * of the query tree. Note that we copy the query tree once we're sure it's a - * distributed query. - * - * Since fast-path queries do not through standard planner, we skip unnecessary - * parts in that case. + * unmodified form. Note that before copying we call + * AssignRTEIdentities, which is needed because these identities need + * to be present in the copied query too. */ - if (!fastPathRouterQuery) - { - rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); - originalQuery = copyObject(parse); + rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); + planContext.originalQuery = copyObject(parse); - setPartitionedTablesInherited = false; - AdjustPartitioningForDistributedPlanning(rangeTableList, - setPartitionedTablesInherited); - } - else - { - /* - * We still need to copy the parse tree because the FastPathPlanner - * modifies it. - */ - originalQuery = copyObject(parse); - } + setPartitionedTablesInherited = false; + AdjustPartitioningForDistributedPlanning(rangeTableList, + setPartitionedTablesInherited); } /* @@ -213,83 +210,42 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) ReplaceTableVisibleFunction((Node *) parse); /* create a restriction context and put it at the end if context list */ - PlannerRestrictionContext *plannerRestrictionContext = - CreateAndPushPlannerRestrictionContext(); + planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); + + /* + * We keep track of how many times we've recursed into the planner, primarily + * to detect whether we are in a function call. We need to make sure that the + * PlannerLevel is decremented exactly once at the end of the next PG_TRY + * block, both in the happy case and when an error occurs. + */ + PlannerLevel++; + PG_TRY(); { - /* - * We keep track of how many times we've recursed into the planner, primarily - * to detect whether we are in a function call. We need to make sure that the - * PlannerLevel is decremented exactly once at the end of this PG_TRY block, - * both in the happy case and when an error occurs. - */ - PlannerLevel++; - - /* - * For trivial queries, we're skipping the standard_planner() in - * order to eliminate its overhead. - * - * Otherwise, call into standard planner. This is required because the Citus - * planner relies on both the restriction information per table and parse tree - * transformations made by postgres' planner. - */ - - if (needsDistributedPlanning && fastPathRouterQuery) + if (fastPathRouterQuery) { - plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery = - true; - plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue = - distributionKeyValue; - - result = FastPathPlanner(originalQuery, parse, boundParams); + result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue); } else { - result = standard_planner(parse, cursorOptions, boundParams); - + /* + * Call into standard_planner because the Citus planner relies on both the + * restriction information per table and parse tree transformations made by + * postgres' planner. + */ + planContext.plan = standard_planner(planContext.query, + planContext.cursorOptions, + planContext.boundParams); if (needsDistributedPlanning) { - /* may've inlined new relation rtes */ - rangeTableList = ExtractRangeTableEntryList(parse); - rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); + result = PlanDistributedStmt(&planContext, rangeTableList, rteIdCounter); + } + else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL) + { + result = planContext.plan; } } - - if (needsDistributedPlanning) - { - uint64 planId = NextPlanId++; - - result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse, - boundParams, plannerRestrictionContext); - - if (!fastPathRouterQuery) - { - setPartitionedTablesInherited = true; - AdjustPartitioningForDistributedPlanning(rangeTableList, - setPartitionedTablesInherited); - } - } - else - { - bool hasExternParam = false; - DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse, - &hasExternParam); - if (delegatePlan != NULL) - { - result = FinalizePlan(result, delegatePlan); - } - else if (hasExternParam) - { - /* - * As in CreateDistributedPlannedStmt, try dissuade planner when planning - * potentially failed due to unresolved prepared statement parameters. - */ - result->planTree->total_cost = FLT_MAX / 100000000; - } - } - - PlannerLevel--; } PG_CATCH(); { @@ -301,6 +257,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } PG_END_TRY(); + PlannerLevel--; /* remove the context from the context list */ PopPlannerRestrictionContext(); @@ -578,30 +535,91 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan) } +/* + * PlanFastPathDistributedStmt creates a distributed planned statement using + * the FastPathPlanner. + */ +static PlannedStmt * +PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, + Const *distributionKeyValue) +{ + planContext->plannerRestrictionContext->fastPathRestrictionContext-> + fastPathRouterQuery = true; + planContext->plannerRestrictionContext->fastPathRestrictionContext-> + distributionKeyValue = distributionKeyValue; + + planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, + planContext->boundParams); + + return CreateDistributedPlannedStmt(planContext); +} + + +/* + * PlanDistributedStmt creates a distributed planned statement using the PG + * planner. + */ +static PlannedStmt * +PlanDistributedStmt(DistributedPlanningContext *planContext, + List *rangeTableList, + int rteIdCounter) +{ + /* may've inlined new relation rtes */ + rangeTableList = ExtractRangeTableEntryList(planContext->query); + rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); + + + PlannedStmt *result = CreateDistributedPlannedStmt(planContext); + + bool setPartitionedTablesInherited = true; + AdjustPartitioningForDistributedPlanning(rangeTableList, + setPartitionedTablesInherited); + + return result; +} + + +/* + * DissuadePlannerFromUsingPlan try dissuade planner when planning a plan that + * potentially failed due to unresolved prepared statement parameters. + */ +void +DissuadePlannerFromUsingPlan(PlannedStmt *plan) +{ + /* + * Arbitrarily high cost, but low enough that it can be added up + * without overflowing by choose_custom_plan(). + */ + plan->planTree->total_cost = FLT_MAX / 100000000; +} + + /* * CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular * query into a distributed plan that is encapsulated by a PlannedStmt. */ static PlannedStmt * -CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, - Query *query, ParamListInfo boundParams, - PlannerRestrictionContext *plannerRestrictionContext) +CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) { + uint64 planId = NextPlanId++; bool hasUnresolvedParams = false; JoinRestrictionContext *joinRestrictionContext = - plannerRestrictionContext->joinRestrictionContext; + planContext->plannerRestrictionContext->joinRestrictionContext; - if (HasUnresolvedExternParamsWalker((Node *) originalQuery, boundParams)) + if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery, + planContext->boundParams)) { hasUnresolvedParams = true; } - plannerRestrictionContext->joinRestrictionContext = + planContext->plannerRestrictionContext->joinRestrictionContext = RemoveDuplicateJoinRestrictions(joinRestrictionContext); DistributedPlan *distributedPlan = - CreateDistributedPlan(planId, originalQuery, query, boundParams, - hasUnresolvedParams, plannerRestrictionContext); + CreateDistributedPlan(planId, planContext->originalQuery, planContext->query, + planContext->boundParams, + hasUnresolvedParams, + planContext->plannerRestrictionContext); /* * If no plan was generated, prepare a generic error to be emitted. @@ -646,7 +664,7 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi distributedPlan->planId = planId; /* create final plan by combining local plan with distributed plan */ - PlannedStmt *resultPlan = FinalizePlan(localPlan, distributedPlan); + PlannedStmt *resultPlan = FinalizePlan(planContext->plan, distributedPlan); /* * As explained above, force planning costs to be unrealistically high if @@ -654,14 +672,11 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi * if it is planned as a multi shard modify query. */ if ((distributedPlan->planningError || - (IsUpdateOrDelete(originalQuery) && IsMultiTaskPlan(distributedPlan))) && + (IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan( + distributedPlan))) && hasUnresolvedParams) { - /* - * Arbitrarily high cost, but low enough that it can be added up - * without overflowing by choose_custom_plan(). - */ - resultPlan->planTree->total_cost = FLT_MAX / 100000000; + DissuadePlannerFromUsingPlan(resultPlan); } return resultPlan; @@ -1099,7 +1114,7 @@ GetDistributedPlan(CustomScan *customScan) * FinalizePlan combines local plan with distributed plan and creates a plan * which can be run by the PostgreSQL executor. */ -static PlannedStmt * +PlannedStmt * FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) { PlannedStmt *finalPlan = NULL; diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 8201abb32..0a4c5a8da 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -95,8 +95,8 @@ contain_param_walker(Node *node, void *context) * forms involving multiple function calls, FROM clauses, WHERE clauses, * ... Those complex forms are handled in the coordinator. */ -DistributedPlan * -TryToDelegateFunctionCall(Query *query, bool *hasExternParam) +PlannedStmt * +TryToDelegateFunctionCall(DistributedPlanningContext *planContext) { List *targetList = NIL; TargetEntry *targetEntry = NULL; @@ -114,12 +114,9 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) StringInfo queryString = NULL; Task *task = NULL; Job *job = NULL; - DistributedPlan *distributedPlan = NULL; + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); struct ParamWalkerContext walkerParamContext = { 0 }; - /* set hasExternParam now in case of early exit */ - *hasExternParam = false; - if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4)) { /* Citus is not ready to determine whether function is distributed */ @@ -133,19 +130,19 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) return NULL; } - if (query == NULL) + if (planContext->query == NULL) { /* no query (mostly here to be defensive) */ return NULL; } - if (query->commandType != CMD_SELECT) + if (planContext->query->commandType != CMD_SELECT) { /* not a SELECT */ return NULL; } - FromExpr *joinTree = query->jointree; + FromExpr *joinTree = planContext->query->jointree; if (joinTree == NULL) { /* no join tree (mostly here to be defensive) */ @@ -174,7 +171,8 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) if (IsA(reference, RangeTblRef)) { - RangeTblEntry *rtentry = rt_fetch(reference->rtindex, query->rtable); + RangeTblEntry *rtentry = rt_fetch(reference->rtindex, + planContext->query->rtable); if (rtentry->rtekind != RTE_RESULT) { /* e.g. SELECT f() FROM rel */ @@ -203,8 +201,8 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) #endif } - targetList = query->targetList; - if (list_length(query->targetList) != 1) + targetList = planContext->query->targetList; + if (list_length(planContext->query->targetList) != 1) { /* multiple target list items */ return NULL; @@ -288,7 +286,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) if (partitionParam->paramkind == PARAM_EXTERN) { /* Don't log a message, we should end up here again without a parameter */ - *hasExternParam = true; + DissuadePlannerFromUsingPlan(planContext->plan); return NULL; } } @@ -354,7 +352,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) if (walkerParamContext.paramKind == PARAM_EXTERN) { /* Don't log a message, we should end up here again without a parameter */ - *hasExternParam = true; + DissuadePlannerFromUsingPlan(planContext->plan); } else { @@ -367,7 +365,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) ereport(DEBUG1, (errmsg("pushing down the function call"))); queryString = makeStringInfo(); - pg_get_query_def(query, queryString); + pg_get_query_def(planContext->query, queryString); task = CitusMakeNode(Task); task->taskType = SELECT_TASK; @@ -378,7 +376,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) job = CitusMakeNode(Job); job->jobId = UniqueJobId(); - job->jobQuery = query; + job->jobQuery = planContext->query; job->taskList = list_make1(task); distributedPlan = CitusMakeNode(DistributedPlan); @@ -390,5 +388,5 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) /* worker will take care of any necessary locking, treat query as read-only */ distributedPlan->modLevel = ROW_MODIFY_READONLY; - return distributedPlan; + return FinalizePlan(planContext->plan, distributedPlan); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a31320d62..59b368350 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -156,7 +156,8 @@ static int CompareInsertValuesByShardId(const void *leftElement, static uint64 GetAnchorShardId(List *relationShardList); static List * TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, - bool *isMultiShardQuery); + bool *isMultiShardQuery, + Const *distributionKeyValue); static List * SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId); @@ -2043,31 +2044,12 @@ PlanRouterQuery(Query *originalQuery, */ if (fastPathRouterQuery) { - List *shardIntervalList = NIL; Const *distributionKeyValue = plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue; - if (distributionKeyValue) - { - Oid relationId = ExtractFirstDistributedTableId(originalQuery); - DistTableCacheEntry *cache = DistributedTableCacheEntry(relationId); - ShardInterval *shardInterval = - FindShardInterval(distributionKeyValue->constvalue, cache); - - shardIntervalList = list_make1(shardInterval); - - if (partitionValueConst != NULL) - { - /* set the outgoing partition column value if requested */ - *partitionValueConst = distributionKeyValue; - } - } - else - { - shardIntervalList = - TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst, - &isMultiShardQuery); - } + List *shardIntervalList = + TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst, + &isMultiShardQuery, distributionKeyValue); /* @@ -2289,15 +2271,27 @@ GetAnchorShardId(List *prunedShardIntervalListList) */ static List * TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, - bool *isMultiShardQuery) + bool *isMultiShardQuery, Const *distributionKeyValue) { - Const *queryPartitionValueConst = NULL; - Oid relationId = ExtractFirstDistributedTableId(query); + + if (distributionKeyValue) + { + DistTableCacheEntry *cache = DistributedTableCacheEntry(relationId); + ShardInterval *shardInterval = + FindShardInterval(distributionKeyValue->constvalue, cache); + + if (partitionValueConst != NULL) + { + /* set the outgoing partition column value if requested */ + *partitionValueConst = distributionKeyValue; + } + return list_make1(shardInterval); + } + Node *quals = query->jointree->quals; - int relationIndex = 1; - + Const *queryPartitionValueConst = NULL; List *prunedShardIntervalList = PruneShards(relationId, relationIndex, make_ands_implicit((Expr *) quals), &queryPartitionValueConst); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 8513b7d60..98d6f45a7 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -129,6 +129,33 @@ typedef struct RelationRowLock } RelationRowLock; +typedef struct DistributedPlanningContext +{ + /* The parsed query that is given to the planner. It is a slightly modified + * to work with the standard_planner */ + Query *query; + + /* A copy of the original parsed query that is given to the planner. This + * doesn't contain most of the changes that are made to parse. There's one + * that change that is made for non fast path router queries though, which + * is the assigning of RTE identities using AssignRTEIdentities. This is + * NULL for non distributed plans, since those don't need it. */ + Query *originalQuery; + + /* the cursor options given to the planner */ + int cursorOptions; + + /* the ParamListInfo that is given to the planner */ + ParamListInfo boundParams; + + /* Plan created either by standard_planner or by FastPathPlanner */ + PlannedStmt *plan; + + /* Our custom restriction context */ + PlannerRestrictionContext *plannerRestrictionContext; +} DistributedPlanningContext; + + extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); extern List * ExtractRangeTableEntryList(Query *query); @@ -150,5 +177,8 @@ extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern int GetRTEIdentity(RangeTblEntry *rte); extern int32 BlessRecordExpression(Expr *expr); +extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan); +extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan, + struct DistributedPlan *distributedPlan); #endif /* DISTRIBUTED_PLANNER_H */ diff --git a/src/include/distributed/function_call_delegation.h b/src/include/distributed/function_call_delegation.h index 5cb5e6058..865ac0ce1 100644 --- a/src/include/distributed/function_call_delegation.h +++ b/src/include/distributed/function_call_delegation.h @@ -10,10 +10,12 @@ #define FUNCTION_CALL_DELEGATION_H #include "postgres.h" + +#include "distributed/distributed_planner.h" #include "distributed/multi_physical_planner.h" -DistributedPlan * TryToDelegateFunctionCall(Query *query, bool *hasParam); +PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext); #endif /* FUNCTION_CALL_DELEGATION_H */