Faster shard pruning.

So far citus used postgres' predicate proofing logic for shard
pruning, except for INSERT and COPY which were already optimized for
speed.  That turns out to be too slow:
* Shard pruning for SELECTs is currently O(#shards), because
  PruneShardList calls predicate_refuted_by() for every
  shard. Obviously using an O(N) type algorithm for general pruning
  isn't good.
* predicate_refuted_by() is quite expensive on its own right. That's
  primarily because it's optimized for doing a single refutation
  proof, rather than performing the same proof over and over.
* predicate_refuted_by() does not keep persistent state (see 2.) for
  function calls, which means that a lot of syscache lookups will be
  performed. That's particularly bad if the partitioning key is a
  composite key, because without a persistent FunctionCallInfo
  record_cmp() has to repeatedly look-up the type definition of the
  composite key. That's quite expensive.

Thus replace this with custom-code that works in two phases:
1) Search restrictions for constraints that can be pruned upon
2) Use those restrictions to search for matching shards in the most
   efficient manner available:
   a) Binary search / Hash Lookup in case of hash partitioned tables
   b) Binary search for equal clauses in case of range or append
      tables without overlapping shards.
   c) Binary search for inequality clauses, searching for both lower
      and upper boundaries, again in case of range or append
      tables without overlapping shards.
   d) exhaustive search testing each ShardInterval

My measurements suggest that we are considerably, often orders of
magnitude, faster than the previous solution, even if we have to fall
back to exhaustive pruning.
support-6.1-faster-pruning
Andres Freund 2017-04-28 14:40:41 -07:00 committed by Jason Petersen
parent 096a1e3200
commit bb456d4002
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
12 changed files with 1373 additions and 336 deletions

View File

@ -67,6 +67,7 @@
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_pruning.h"
#include "executor/executor.h"
#include "tsearch/ts_locale.h"
#include "utils/builtins.h"

View File

@ -40,6 +40,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_pruning.h"
#include "distributed/worker_protocol.h"
#include "optimizer/clauses.h"
#include "optimizer/predtest.h"
@ -81,7 +82,6 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
Node *queryTreeNode;
List *restrictClauseList = NIL;
bool failOK = false;
List *shardIntervalList = NIL;
List *prunedShardIntervalList = NIL;
List *taskList = NIL;
int32 affectedTupleCount = 0;
@ -156,11 +156,10 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
ExecuteMasterEvaluableFunctions(modifyQuery);
shardIntervalList = LoadShardIntervalList(relationId);
restrictClauseList = WhereClauseList(modifyQuery->jointree);
prunedShardIntervalList =
PruneShardList(relationId, tableId, restrictClauseList, shardIntervalList);
PruneShards(relationId, tableId, restrictClauseList);
CHECK_FOR_INTERRUPTS();

View File

@ -40,6 +40,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_pruning.h"
#include "distributed/task_tracker.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
@ -131,9 +132,6 @@ static List * RangeTableFragmentsList(List *rangeTableList, List *whereClauseLis
static OperatorCacheEntry * LookupOperatorByType(Oid typeId, Oid accessMethodId,
int16 strategyNumber);
static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
static Node * HashableClauseMutator(Node *originalNode, Var *partitionColumn);
static OpExpr * MakeHashedOperatorExpression(OpExpr *operatorExpression);
static List * BuildRestrictInfoList(List *qualList);
static List * FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery,
List *dependedJobList);
static JoinSequenceNode * JoinSequenceArray(List *rangeTableFragmentsList,
@ -2044,7 +2042,6 @@ SubquerySqlTaskList(Job *job)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
Oid relationId = rangeTableEntry->relid;
List *shardIntervalList = LoadShardIntervalList(relationId);
List *finalShardIntervalList = NIL;
ListCell *fragmentCombinationCell = NULL;
ListCell *shardIntervalCell = NULL;
@ -2057,12 +2054,11 @@ SubquerySqlTaskList(Job *job)
Var *partitionColumn = PartitionColumn(relationId, tableId);
List *whereClauseList = ReplaceColumnsInOpExpressionList(opExpressionList,
partitionColumn);
finalShardIntervalList = PruneShardList(relationId, tableId, whereClauseList,
shardIntervalList);
finalShardIntervalList = PruneShards(relationId, tableId, whereClauseList);
}
else
{
finalShardIntervalList = shardIntervalList;
finalShardIntervalList = LoadShardIntervalList(relationId);
}
/* if all shards are pruned away, we return an empty task list */
@ -2499,11 +2495,8 @@ RangeTableFragmentsList(List *rangeTableList, List *whereClauseList,
Oid relationId = rangeTableEntry->relid;
ListCell *shardIntervalCell = NULL;
List *shardFragmentList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
List *prunedShardIntervalList = PruneShardList(relationId, tableId,
whereClauseList,
shardIntervalList);
List *prunedShardIntervalList = PruneShards(relationId, tableId,
whereClauseList);
/*
* If we prune all shards for one table, query results will be empty.
@ -2572,119 +2565,6 @@ RangeTableFragmentsList(List *rangeTableList, List *whereClauseList,
}
/*
* PruneShardList prunes shard intervals from given list based on the selection criteria,
* and returns remaining shard intervals in another list.
*
* For reference tables, the function simply returns the single shard that the table has.
*/
List *
PruneShardList(Oid relationId, Index tableId, List *whereClauseList,
List *shardIntervalList)
{
List *remainingShardList = NIL;
ListCell *shardIntervalCell = NULL;
List *restrictInfoList = NIL;
Node *baseConstraint = NULL;
Var *partitionColumn = PartitionColumn(relationId, tableId);
char partitionMethod = PartitionMethod(relationId);
/* short circuit for reference tables */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
return shardIntervalList;
}
if (ContainsFalseClause(whereClauseList))
{
/* always return empty result if WHERE clause is of the form: false (AND ..) */
return NIL;
}
/* build the filter clause list for the partition method */
if (partitionMethod == DISTRIBUTE_BY_HASH)
{
Node *hashedNode = HashableClauseMutator((Node *) whereClauseList,
partitionColumn);
List *hashedClauseList = (List *) hashedNode;
restrictInfoList = BuildRestrictInfoList(hashedClauseList);
}
else
{
restrictInfoList = BuildRestrictInfoList(whereClauseList);
}
/* override the partition column for hash partitioning */
if (partitionMethod == DISTRIBUTE_BY_HASH)
{
partitionColumn = MakeInt4Column();
}
/* build the base expression for constraint */
baseConstraint = BuildBaseConstraint(partitionColumn);
/* walk over shard list and check if shards can be pruned */
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
List *constraintList = NIL;
bool shardPruned = false;
if (shardInterval->minValueExists && shardInterval->maxValueExists)
{
/* set the min/max values in the base constraint */
UpdateConstraint(baseConstraint, shardInterval);
constraintList = list_make1(baseConstraint);
shardPruned = predicate_refuted_by(constraintList, restrictInfoList);
}
if (shardPruned)
{
ereport(DEBUG2, (errmsg("predicate pruning for shardId "
UINT64_FORMAT, shardInterval->shardId)));
}
else
{
remainingShardList = lappend(remainingShardList, shardInterval);
}
}
return remainingShardList;
}
/*
* ContainsFalseClause returns whether the flattened where clause list
* contains false as a clause.
*/
bool
ContainsFalseClause(List *whereClauseList)
{
bool containsFalseClause = false;
ListCell *clauseCell = NULL;
foreach(clauseCell, whereClauseList)
{
Node *clause = (Node *) lfirst(clauseCell);
if (IsA(clause, Const))
{
Const *constant = (Const *) clause;
if (constant->consttype == BOOLOID && !DatumGetBool(constant->constvalue))
{
containsFalseClause = true;
break;
}
}
}
return containsFalseClause;
}
/*
* BuildBaseConstraint builds and returns a base constraint. This constraint
* implements an expression in the form of (column <= max && column >= min),
@ -2907,87 +2787,6 @@ SimpleOpExpression(Expr *clause)
}
/*
* HashableClauseMutator walks over the original where clause list, replaces
* hashable nodes with hashed versions and keeps other nodes as they are.
*/
static Node *
HashableClauseMutator(Node *originalNode, Var *partitionColumn)
{
Node *newNode = NULL;
if (originalNode == NULL)
{
return NULL;
}
if (IsA(originalNode, OpExpr))
{
OpExpr *operatorExpression = (OpExpr *) originalNode;
bool hasPartitionColumn = false;
Oid leftHashFunction = InvalidOid;
Oid rightHashFunction = InvalidOid;
/*
* If operatorExpression->opno is NOT the registered '=' operator for
* any hash opfamilies, then get_op_hash_functions will return false.
* This means this function both ensures a hash function exists for the
* types in question AND filters out any clauses lacking equality ops.
*/
bool hasHashFunction = get_op_hash_functions(operatorExpression->opno,
&leftHashFunction,
&rightHashFunction);
bool simpleOpExpression = SimpleOpExpression((Expr *) operatorExpression);
if (simpleOpExpression)
{
hasPartitionColumn = OpExpressionContainsColumn(operatorExpression,
partitionColumn);
}
if (hasHashFunction && hasPartitionColumn)
{
OpExpr *hashedOperatorExpression =
MakeHashedOperatorExpression((OpExpr *) originalNode);
newNode = (Node *) hashedOperatorExpression;
}
}
else if (IsA(originalNode, ScalarArrayOpExpr))
{
ScalarArrayOpExpr *arrayOperatorExpression = (ScalarArrayOpExpr *) originalNode;
Node *leftOpExpression = linitial(arrayOperatorExpression->args);
Node *strippedLeftOpExpression = strip_implicit_coercions(leftOpExpression);
bool usingEqualityOperator = OperatorImplementsEquality(
arrayOperatorExpression->opno);
/*
* Citus cannot prune hash-distributed shards with ANY/ALL. We show a NOTICE
* if the expression is ANY/ALL performed on the partition column with equality.
*/
if (usingEqualityOperator && strippedLeftOpExpression != NULL &&
equal(strippedLeftOpExpression, partitionColumn))
{
ereport(NOTICE, (errmsg("cannot use shard pruning with "
"ANY/ALL (array expression)"),
errhint("Consider rewriting the expression with "
"OR/AND clauses.")));
}
}
/*
* If this node is not hashable, continue walking down the expression tree
* to find and hash clauses which are eligible.
*/
if (newNode == NULL)
{
newNode = expression_tree_mutator(originalNode, HashableClauseMutator,
(void *) partitionColumn);
}
return newNode;
}
/*
* OpExpressionContainsColumn checks if the operator expression contains the
* given partition column. We assume that given operator expression is a simple
@ -3018,77 +2817,6 @@ OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn)
}
/*
* MakeHashedOperatorExpression creates a new operator expression with a column
* of int4 type and hashed constant value.
*/
static OpExpr *
MakeHashedOperatorExpression(OpExpr *operatorExpression)
{
const Oid hashResultTypeId = INT4OID;
TypeCacheEntry *hashResultTypeEntry = NULL;
Oid operatorId = InvalidOid;
OpExpr *hashedExpression = NULL;
Var *hashedColumn = NULL;
Datum hashedValue = 0;
Const *hashedConstant = NULL;
FmgrInfo *hashFunction = NULL;
TypeCacheEntry *typeEntry = NULL;
Node *leftOperand = get_leftop((Expr *) operatorExpression);
Node *rightOperand = get_rightop((Expr *) operatorExpression);
Const *constant = NULL;
if (IsA(rightOperand, Const))
{
constant = (Const *) rightOperand;
}
else
{
constant = (Const *) leftOperand;
}
/* Load the operator from type cache */
hashResultTypeEntry = lookup_type_cache(hashResultTypeId, TYPECACHE_EQ_OPR);
operatorId = hashResultTypeEntry->eq_opr;
/* Get a column with int4 type */
hashedColumn = MakeInt4Column();
/* Load the hash function from type cache */
typeEntry = lookup_type_cache(constant->consttype, TYPECACHE_HASH_PROC_FINFO);
hashFunction = &(typeEntry->hash_proc_finfo);
if (!OidIsValid(hashFunction->fn_oid))
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("could not identify a hash function for type %s",
format_type_be(constant->consttype)),
errdatatype(constant->consttype)));
}
/*
* Note that any changes to PostgreSQL's hashing functions will change the
* new value created by this function.
*/
hashedValue = FunctionCall1(hashFunction, constant->constvalue);
hashedConstant = MakeInt4Constant(hashedValue);
/* Now create the expression with modified partition column and hashed constant */
hashedExpression = (OpExpr *) make_opclause(operatorId,
InvalidOid, /* no result type yet */
false, /* no return set */
(Expr *) hashedColumn,
(Expr *) hashedConstant,
InvalidOid, InvalidOid);
/* Set implementing function id and result type */
hashedExpression->opfuncid = get_opcode(operatorId);
hashedExpression->opresulttype = get_func_rettype(hashedExpression->opfuncid);
return hashedExpression;
}
/*
* MakeInt4Column creates a column of int4 type with invalid table id and max
* attribute number.
@ -3130,30 +2858,6 @@ MakeInt4Constant(Datum constantValue)
}
/*
* BuildRestrictInfoList builds restrict info list using the selection criteria,
* and then return this list. Note that this function assumes there is only one
* relation for now.
*/
static List *
BuildRestrictInfoList(List *qualList)
{
List *restrictInfoList = NIL;
ListCell *qualCell = NULL;
foreach(qualCell, qualList)
{
RestrictInfo *restrictInfo = NULL;
Node *qualNode = (Node *) lfirst(qualCell);
restrictInfo = make_simple_restrictinfo((Expr *) qualNode);
restrictInfoList = lappend(restrictInfoList, restrictInfo);
}
return restrictInfoList;
}
/* Updates the base constraint with the given min/max values. */
void
UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval)

View File

@ -39,6 +39,7 @@
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_pruning.h"
#include "executor/execdesc.h"
#include "lib/stringinfo.h"
#include "nodes/makefuncs.h"
@ -533,8 +534,13 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
* items in it. The list consists of shard interval ranges with hashed columns
* such as (hashColumn >= shardMinValue) and (hashedColumn <= shardMaxValue).
*
* The function errors out if the given shard interval does not belong to a hash
* distributed table.
* The function returns hashed columns generated by MakeInt4Column() for the hash
* partitioned tables in place of partition columns.
*
* The function errors out if the given shard interval does not belong to a hash,
* range and append distributed tables.
*
* NB: If you update this, also look at PrunableExpressionsWalker().
*/
static List *
HashedShardIntervalOpExpressions(ShardInterval *shardInterval)
@ -2070,10 +2076,8 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
{
List *restrictClauseList = QueryRestrictList(query);
Index tableId = 1;
List *shardIntervalList = LoadShardIntervalList(distributedTableId);
prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList,
shardIntervalList);
prunedShardList = PruneShards(distributedTableId, tableId, restrictClauseList);
}
prunedShardCount = list_length(prunedShardList);
@ -2467,7 +2471,6 @@ TargetShardIntervalsForSelect(Query *query,
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
List *prunedShardList = NIL;
int shardIndex = 0;
List *joinInfoList = relationRestriction->relOptInfo->joininfo;
List *pseudoRestrictionList = extract_actual_clauses(joinInfoList, true);
bool whereFalseQuery = false;
@ -2483,18 +2486,8 @@ TargetShardIntervalsForSelect(Query *query,
whereFalseQuery = ContainsFalseClause(pseudoRestrictionList);
if (!whereFalseQuery && shardCount > 0)
{
List *shardIntervalList = NIL;
for (shardIndex = 0; shardIndex < shardCount; shardIndex++)
{
ShardInterval *shardInterval =
cacheEntry->sortedShardIntervalArray[shardIndex];
shardIntervalList = lappend(shardIntervalList, shardInterval);
}
prunedShardList = PruneShardList(relationId, tableId,
restrictClauseList,
shardIntervalList);
prunedShardList = PruneShards(relationId, tableId,
restrictClauseList);
/*
* Quick bail out. The query can not be router plannable if one

File diff suppressed because it is too large Load Diff

View File

@ -24,6 +24,7 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/resource_lock.h"
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
#include "distributed/shard_pruning.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "nodes/nodes.h"
@ -203,11 +204,11 @@ PrunedShardIdsForTable(Oid distributedTableId, List *whereClauseList)
Oid shardIdTypeId = INT8OID;
Index tableId = 1;
List *shardList = LoadShardIntervalList(distributedTableId);
List *shardList = NIL;
int shardIdCount = -1;
Datum *shardIdDatumArray = NULL;
shardList = PruneShardList(distributedTableId, tableId, whereClauseList, shardList);
shardList = PruneShards(distributedTableId, tableId, whereClauseList);
shardIdCount = list_length(shardList);
shardIdDatumArray = palloc0(shardIdCount * sizeof(Datum));

View File

@ -16,6 +16,7 @@
#include "catalog/pg_type.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_planner.h"
#include "distributed/shard_pruning.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/worker_protocol.h"
@ -23,7 +24,6 @@
#include "utils/memutils.h"
static int FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry);
static int SearchCachedShardInterval(Datum partitionColumnValue,
ShardInterval **shardIntervalCache,
int shardCount, FmgrInfo *compareFunction);
@ -254,7 +254,7 @@ FindShardInterval(Datum partitionColumnValue, DistTableCacheEntry *cacheEntry)
* somewhere. Such as a hash function which returns a value not in the range
* of [INT32_MIN, INT32_MAX] can fire this.
*/
static int
int
FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry)
{
ShardInterval **shardIntervalCache = cacheEntry->sortedShardIntervalArray;

View File

@ -249,10 +249,6 @@ extern StringInfo ShardFetchQueryString(uint64 shardId);
extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString);
/* Function declarations for shard pruning */
extern List * PruneShardList(Oid relationId, Index tableId, List *whereClauseList,
List *shardList);
extern bool ContainsFalseClause(List *whereClauseList);
extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
/*

View File

@ -0,0 +1,23 @@
/*-------------------------------------------------------------------------
*
* shard_pruning.h
* Shard pruning infrastructure.
*
* Copyright (c) 2014-2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef SHARD_PRUNING_H_
#define SHARD_PRUNING_H_
#include "distributed/metadata_cache.h"
#include "nodes/primnodes.h"
#define INVALID_SHARD_INDEX -1
/* Function declarations for shard pruning */
extern List * PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList);
extern bool ContainsFalseClause(List *whereClauseList);
#endif /* SHARD_PRUNING_H_ */

View File

@ -35,6 +35,7 @@ extern int CompareRelationShards(const void *leftElement,
extern int ShardIndex(ShardInterval *shardInterval);
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
DistTableCacheEntry *cacheEntry);
extern int FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry);
extern bool SingleReplicatedTable(Oid relationId);
#endif /* SHARDINTERVAL_UTILS_H_ */

View File

@ -70,21 +70,21 @@ SELECT prune_using_single_value('pruning', NULL);
SELECT prune_using_either_value('pruning', 'tomato', 'petunia');
prune_using_either_value
--------------------------
{800001,800002}
{800002,800001}
(1 row)
-- an AND clause with incompatible values returns no shards
-- an AND clause with values on different shards returns no shards
SELECT prune_using_both_values('pruning', 'tomato', 'petunia');
prune_using_both_values
-------------------------
{}
(1 row)
-- but if both values are on the same shard, should get back that shard
-- even if both values are on the same shard, a value can't be equal to two others
SELECT prune_using_both_values('pruning', 'tomato', 'rose');
prune_using_both_values
-------------------------
{800002}
{}
(1 row)
-- unit test of the equality expression generation code

View File

@ -60,10 +60,10 @@ SELECT prune_using_single_value('pruning', NULL);
-- build an OR clause and expect more than one sahrd
SELECT prune_using_either_value('pruning', 'tomato', 'petunia');
-- an AND clause with incompatible values returns no shards
-- an AND clause with values on different shards returns no shards
SELECT prune_using_both_values('pruning', 'tomato', 'petunia');
-- but if both values are on the same shard, should get back that shard
-- even if both values are on the same shard, a value can't be equal to two others
SELECT prune_using_both_values('pruning', 'tomato', 'rose');
-- unit test of the equality expression generation code