mirror of https://github.com/citusdata/citus.git
Skip shard pruning when possible
We're already traversing the queryTree and finding the distribution key value, so pass it to the later stages of the planning.pull/3332/head
parent
ca293116fa
commit
7f3ab7892d
|
@ -129,6 +129,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||
int rteIdCounter = 1;
|
||||
bool fastPathRouterQuery = false;
|
||||
Const *distributionKeyValue = NULL;
|
||||
|
||||
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
|
||||
{
|
||||
|
@ -154,7 +155,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList);
|
||||
if (needsDistributedPlanning)
|
||||
{
|
||||
fastPathRouterQuery = FastPathRouterQuery(parse);
|
||||
fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -224,6 +225,8 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
{
|
||||
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery =
|
||||
true;
|
||||
plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue =
|
||||
distributionKeyValue;
|
||||
|
||||
result = FastPathPlanner(originalQuery, parse, boundParams);
|
||||
}
|
||||
|
|
|
@ -58,8 +58,10 @@
|
|||
bool EnableFastPathRouterPlanner = true;
|
||||
|
||||
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
|
||||
static bool ConjunctionContainsColumnFilter(Node *node, Var *column);
|
||||
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn);
|
||||
static bool ConjunctionContainsColumnFilter(Node *node, Var *column,
|
||||
Const **distributionKeyValue);
|
||||
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
|
||||
Const **distributionKeyValue);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -122,7 +124,9 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
|||
SeqScan *seqScanNode = makeNode(SeqScan);
|
||||
Plan *plan = &seqScanNode->plan;
|
||||
|
||||
AssertArg(FastPathRouterQuery(parse));
|
||||
Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
|
||||
|
||||
AssertArg(FastPathRouterQuery(parse, &distKey));
|
||||
|
||||
/* there is only a single relation rte */
|
||||
seqScanNode->scanrelid = 1;
|
||||
|
@ -162,11 +166,12 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
|||
* and it should be ANDed with any other filters. Also, the distribution
|
||||
* key should only exists once in the WHERE clause. So basically,
|
||||
* SELECT ... FROM dist_table WHERE dist_key = X
|
||||
* If the filter is a const, distributionKeyValue is set
|
||||
* - All INSERT statements (including multi-row INSERTs) as long as the commands
|
||||
* don't have any sublinks/CTEs etc
|
||||
*/
|
||||
bool
|
||||
FastPathRouterQuery(Query *query)
|
||||
FastPathRouterQuery(Query *query, Const **distributionKeyValue)
|
||||
{
|
||||
FromExpr *joinTree = query->jointree;
|
||||
Node *quals = NULL;
|
||||
|
@ -254,7 +259,7 @@ FastPathRouterQuery(Query *query)
|
|||
* This is to simplify both of the individual checks and omit various edge cases
|
||||
* that might arise with multiple distribution keys in the quals.
|
||||
*/
|
||||
if (ConjunctionContainsColumnFilter(quals, distributionKey) &&
|
||||
if (ConjunctionContainsColumnFilter(quals, distributionKey, distributionKeyValue) &&
|
||||
!ColumnAppearsMultipleTimes(quals, distributionKey))
|
||||
{
|
||||
return true;
|
||||
|
@ -298,9 +303,11 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey)
|
|||
* ConjunctionContainsColumnFilter returns true if the query contains an exact
|
||||
* match (equal) expression on the provided column. The function returns true only
|
||||
* if the match expression has an AND relation with the rest of the expression tree.
|
||||
*
|
||||
* If the conjuction contains column filter which is const, distributionKeyValue is set.
|
||||
*/
|
||||
static bool
|
||||
ConjunctionContainsColumnFilter(Node *node, Var *column)
|
||||
ConjunctionContainsColumnFilter(Node *node, Var *column, Const **distributionKeyValue)
|
||||
{
|
||||
if (node == NULL)
|
||||
{
|
||||
|
@ -311,7 +318,7 @@ ConjunctionContainsColumnFilter(Node *node, Var *column)
|
|||
{
|
||||
OpExpr *opExpr = (OpExpr *) node;
|
||||
bool distKeyInSimpleOpExpression =
|
||||
DistKeyInSimpleOpExpression((Expr *) opExpr, column);
|
||||
DistKeyInSimpleOpExpression((Expr *) opExpr, column, distributionKeyValue);
|
||||
|
||||
if (!distKeyInSimpleOpExpression)
|
||||
{
|
||||
|
@ -342,7 +349,8 @@ ConjunctionContainsColumnFilter(Node *node, Var *column)
|
|||
{
|
||||
Node *argumentNode = (Node *) lfirst(argumentCell);
|
||||
|
||||
if (ConjunctionContainsColumnFilter(argumentNode, column))
|
||||
if (ConjunctionContainsColumnFilter(argumentNode, column,
|
||||
distributionKeyValue))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
@ -357,9 +365,11 @@ ConjunctionContainsColumnFilter(Node *node, Var *column)
|
|||
* DistKeyInSimpleOpExpression checks whether given expression is a simple operator
|
||||
* expression with either (dist_key = param) or (dist_key = const). Note that the
|
||||
* operands could be in the reverse order as well.
|
||||
*
|
||||
* When a const is found, distributionKeyValue is set.
|
||||
*/
|
||||
static bool
|
||||
DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn)
|
||||
DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Const **distributionKeyValue)
|
||||
{
|
||||
Node *leftOperand = NULL;
|
||||
Node *rightOperand = NULL;
|
||||
|
@ -420,6 +430,14 @@ DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn)
|
|||
|
||||
/* at this point we should have the columnInExpr */
|
||||
Assert(columnInExpr);
|
||||
bool distColumnExists = equal(distColumn, columnInExpr);
|
||||
if (distColumnExists && constantClause != NULL &&
|
||||
distColumn->vartype == constantClause->consttype &&
|
||||
*distributionKeyValue == NULL)
|
||||
{
|
||||
/* if the vartypes do not match, let shard pruning handle it later */
|
||||
*distributionKeyValue = copyObject(constantClause);
|
||||
}
|
||||
|
||||
return equal(distColumn, columnInExpr);
|
||||
return distColumnExists;
|
||||
}
|
||||
|
|
|
@ -1617,8 +1617,9 @@ ExtractFirstDistributedTableId(Query *query)
|
|||
List *rangeTableList = query->rtable;
|
||||
ListCell *rangeTableCell = NULL;
|
||||
Oid distributedTableId = InvalidOid;
|
||||
Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
|
||||
|
||||
Assert(IsModifyCommand(query) || FastPathRouterQuery(query));
|
||||
Assert(IsModifyCommand(query) || FastPathRouterQuery(query, &distKey));
|
||||
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
{
|
||||
|
@ -2026,9 +2027,32 @@ PlanRouterQuery(Query *originalQuery,
|
|||
*/
|
||||
if (fastPathRouterQuery)
|
||||
{
|
||||
List *shardIntervalList =
|
||||
TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst,
|
||||
&isMultiShardQuery);
|
||||
List *shardIntervalList = NIL;
|
||||
Const *distributionKeyValue =
|
||||
plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue;
|
||||
|
||||
if (distributionKeyValue)
|
||||
{
|
||||
Oid relationId = ExtractFirstDistributedTableId(originalQuery);
|
||||
DistTableCacheEntry *cache = DistributedTableCacheEntry(relationId);
|
||||
ShardInterval *shardInterval =
|
||||
FindShardInterval(distributionKeyValue->constvalue, cache);
|
||||
|
||||
shardIntervalList = list_make1(shardInterval);
|
||||
|
||||
if (partitionValueConst != NULL)
|
||||
{
|
||||
/* set the outgoing partition column value if requested */
|
||||
*partitionValueConst = distributionKeyValue;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
shardIntervalList =
|
||||
TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst,
|
||||
&isMultiShardQuery);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* This could only happen when there is a parameter on the distribution key.
|
||||
|
@ -2263,7 +2287,8 @@ TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst,
|
|||
&queryPartitionValueConst);
|
||||
|
||||
/* we're only expecting single shard from a single table */
|
||||
Assert(FastPathRouterQuery(query));
|
||||
Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
|
||||
Assert(FastPathRouterQuery(query, &distKey));
|
||||
|
||||
if (list_length(prunedShardIntervalList) > 1)
|
||||
{
|
||||
|
|
|
@ -88,6 +88,14 @@ typedef struct JoinRestriction
|
|||
typedef struct FastPathRestrictionContext
|
||||
{
|
||||
bool fastPathRouterQuery;
|
||||
|
||||
/*
|
||||
* While calculating fastPathRouterQuery, we could sometimes be
|
||||
* able to extract the distribution key value as well (such as when
|
||||
* there are no prepared statements). Could be NULL when the distribution
|
||||
* key contains parameter, so check for it before using.
|
||||
*/
|
||||
Const *distributionKeyValue;
|
||||
}FastPathRestrictionContext;
|
||||
|
||||
typedef struct PlannerRestrictionContext
|
||||
|
|
|
@ -78,6 +78,6 @@ extern List * WorkersContainingAllShards(List *prunedShardIntervalsList);
|
|||
|
||||
extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo
|
||||
boundParams);
|
||||
extern bool FastPathRouterQuery(Query *query);
|
||||
extern bool FastPathRouterQuery(Query *query, Const **distributionKeyValue);
|
||||
|
||||
#endif /* MULTI_ROUTER_PLANNER_H */
|
||||
|
|
|
@ -61,6 +61,7 @@ DELETE FROM modify_fast_path WHERE key = 1 and FALSE;
|
|||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
DETAIL: distribution column value: 1
|
||||
-- UPDATE may include complex target entries
|
||||
UPDATE modify_fast_path SET value_1 = value_1 + 12 * value_1 WHERE key = 1;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
|
|
Loading…
Reference in New Issue