Merge pull request #1331 from citusdata/feature/faster-pruning

Faster Shard Pruning Implementation
pull/1365/head
Andres Freund 2017-04-28 15:01:41 -07:00 committed by GitHub
commit 4094b45ba9
18 changed files with 1543 additions and 406 deletions

View File

@ -67,6 +67,7 @@
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_pruning.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "tsearch/ts_locale.h"

View File

@ -40,6 +40,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_pruning.h"
#include "distributed/worker_protocol.h"
#include "optimizer/clauses.h"
#include "optimizer/predtest.h"
@ -81,7 +82,6 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
Node *queryTreeNode;
List *restrictClauseList = NIL;
bool failOK = false;
List *shardIntervalList = NIL;
List *prunedShardIntervalList = NIL;
List *taskList = NIL;
int32 affectedTupleCount = 0;
@ -156,11 +156,10 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
ExecuteMasterEvaluableFunctions(modifyQuery, NULL);
shardIntervalList = LoadShardIntervalList(relationId);
restrictClauseList = WhereClauseList(modifyQuery->jointree);
prunedShardIntervalList =
PruneShardList(relationId, tableId, restrictClauseList, shardIntervalList);
PruneShards(relationId, tableId, restrictClauseList);
CHECK_FOR_INTERRUPTS();

View File

@ -41,6 +41,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_pruning.h"
#include "distributed/task_tracker.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
@ -133,9 +134,6 @@ static List * RangeTableFragmentsList(List *rangeTableList, List *whereClauseLis
static OperatorCacheEntry * LookupOperatorByType(Oid typeId, Oid accessMethodId,
int16 strategyNumber);
static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
static Node * HashableClauseMutator(Node *originalNode, Var *partitionColumn);
static OpExpr * MakeHashedOperatorExpression(OpExpr *operatorExpression);
static List * BuildRestrictInfoList(List *qualList);
static List * FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery,
List *dependedJobList);
static JoinSequenceNode * JoinSequenceArray(List *rangeTableFragmentsList,
@ -2060,7 +2058,6 @@ SubquerySqlTaskList(Job *job)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
Oid relationId = rangeTableEntry->relid;
List *shardIntervalList = LoadShardIntervalList(relationId);
List *finalShardIntervalList = NIL;
ListCell *fragmentCombinationCell = NULL;
ListCell *shardIntervalCell = NULL;
@ -2073,12 +2070,11 @@ SubquerySqlTaskList(Job *job)
Var *partitionColumn = PartitionColumn(relationId, tableId);
List *whereClauseList = ReplaceColumnsInOpExpressionList(opExpressionList,
partitionColumn);
finalShardIntervalList = PruneShardList(relationId, tableId, whereClauseList,
shardIntervalList);
finalShardIntervalList = PruneShards(relationId, tableId, whereClauseList);
}
else
{
finalShardIntervalList = shardIntervalList;
finalShardIntervalList = LoadShardIntervalList(relationId);
}
/* if all shards are pruned away, we return an empty task list */
@ -2513,11 +2509,8 @@ RangeTableFragmentsList(List *rangeTableList, List *whereClauseList,
Oid relationId = rangeTableEntry->relid;
ListCell *shardIntervalCell = NULL;
List *shardFragmentList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
List *prunedShardIntervalList = PruneShardList(relationId, tableId,
whereClauseList,
shardIntervalList);
List *prunedShardIntervalList = PruneShards(relationId, tableId,
whereClauseList);
/*
* If we prune all shards for one table, query results will be empty.
@ -2586,114 +2579,6 @@ RangeTableFragmentsList(List *rangeTableList, List *whereClauseList,
}
/*
* PruneShardList prunes shard intervals from given list based on the selection criteria,
* and returns remaining shard intervals in another list.
*
* For reference tables, the function simply returns the single shard that the table has.
*/
List *
PruneShardList(Oid relationId, Index tableId, List *whereClauseList,
List *shardIntervalList)
{
List *remainingShardList = NIL;
ListCell *shardIntervalCell = NULL;
List *restrictInfoList = NIL;
Node *baseConstraint = NULL;
Var *partitionColumn = PartitionColumn(relationId, tableId);
char partitionMethod = PartitionMethod(relationId);
/* short circuit for reference tables */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
return shardIntervalList;
}
if (ContainsFalseClause(whereClauseList))
{
/* always return empty result if WHERE clause is of the form: false (AND ..) */
return NIL;
}
/* build the filter clause list for the partition method */
if (partitionMethod == DISTRIBUTE_BY_HASH)
{
Node *hashedNode = HashableClauseMutator((Node *) whereClauseList,
partitionColumn);
List *hashedClauseList = (List *) hashedNode;
restrictInfoList = BuildRestrictInfoList(hashedClauseList);
}
else
{
restrictInfoList = BuildRestrictInfoList(whereClauseList);
}
/* override the partition column for hash partitioning */
if (partitionMethod == DISTRIBUTE_BY_HASH)
{
partitionColumn = MakeInt4Column();
}
/* build the base expression for constraint */
baseConstraint = BuildBaseConstraint(partitionColumn);
/* walk over shard list and check if shards can be pruned */
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
List *constraintList = NIL;
bool shardPruned = false;
if (shardInterval->minValueExists && shardInterval->maxValueExists)
{
/* set the min/max values in the base constraint */
UpdateConstraint(baseConstraint, shardInterval);
constraintList = list_make1(baseConstraint);
shardPruned = predicate_refuted_by(constraintList, restrictInfoList);
}
if (!shardPruned)
{
remainingShardList = lappend(remainingShardList, shardInterval);
}
}
return remainingShardList;
}
/*
* ContainsFalseClause returns whether the flattened where clause list
* contains false as a clause.
*/
bool
ContainsFalseClause(List *whereClauseList)
{
bool containsFalseClause = false;
ListCell *clauseCell = NULL;
foreach(clauseCell, whereClauseList)
{
Node *clause = (Node *) lfirst(clauseCell);
if (IsA(clause, Const))
{
Const *constant = (Const *) clause;
if (constant->consttype == BOOLOID && !DatumGetBool(constant->constvalue))
{
containsFalseClause = true;
break;
}
}
}
return containsFalseClause;
}
/*
* BuildBaseConstraint builds and returns a base constraint. This constraint
* implements an expression in the form of (column <= max && column >= min),
@ -2916,87 +2801,6 @@ SimpleOpExpression(Expr *clause)
}
/*
* HashableClauseMutator walks over the original where clause list, replaces
* hashable nodes with hashed versions and keeps other nodes as they are.
*/
static Node *
HashableClauseMutator(Node *originalNode, Var *partitionColumn)
{
Node *newNode = NULL;
if (originalNode == NULL)
{
return NULL;
}
if (IsA(originalNode, OpExpr))
{
OpExpr *operatorExpression = (OpExpr *) originalNode;
bool hasPartitionColumn = false;
Oid leftHashFunction = InvalidOid;
Oid rightHashFunction = InvalidOid;
/*
* If operatorExpression->opno is NOT the registered '=' operator for
* any hash opfamilies, then get_op_hash_functions will return false.
* This means this function both ensures a hash function exists for the
* types in question AND filters out any clauses lacking equality ops.
*/
bool hasHashFunction = get_op_hash_functions(operatorExpression->opno,
&leftHashFunction,
&rightHashFunction);
bool simpleOpExpression = SimpleOpExpression((Expr *) operatorExpression);
if (simpleOpExpression)
{
hasPartitionColumn = OpExpressionContainsColumn(operatorExpression,
partitionColumn);
}
if (hasHashFunction && hasPartitionColumn)
{
OpExpr *hashedOperatorExpression =
MakeHashedOperatorExpression((OpExpr *) originalNode);
newNode = (Node *) hashedOperatorExpression;
}
}
else if (IsA(originalNode, ScalarArrayOpExpr))
{
ScalarArrayOpExpr *arrayOperatorExpression = (ScalarArrayOpExpr *) originalNode;
Node *leftOpExpression = linitial(arrayOperatorExpression->args);
Node *strippedLeftOpExpression = strip_implicit_coercions(leftOpExpression);
bool usingEqualityOperator = OperatorImplementsEquality(
arrayOperatorExpression->opno);
/*
* Citus cannot prune hash-distributed shards with ANY/ALL. We show a NOTICE
* if the expression is ANY/ALL performed on the partition column with equality.
*/
if (usingEqualityOperator && strippedLeftOpExpression != NULL &&
equal(strippedLeftOpExpression, partitionColumn))
{
ereport(NOTICE, (errmsg("cannot use shard pruning with "
"ANY/ALL (array expression)"),
errhint("Consider rewriting the expression with "
"OR/AND clauses.")));
}
}
/*
* If this node is not hashable, continue walking down the expression tree
* to find and hash clauses which are eligible.
*/
if (newNode == NULL)
{
newNode = expression_tree_mutator(originalNode, HashableClauseMutator,
(void *) partitionColumn);
}
return newNode;
}
/*
* OpExpressionContainsColumn checks if the operator expression contains the
* given partition column. We assume that given operator expression is a simple
@ -3027,77 +2831,6 @@ OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn)
}
/*
* MakeHashedOperatorExpression creates a new operator expression with a column
* of int4 type and hashed constant value.
*/
static OpExpr *
MakeHashedOperatorExpression(OpExpr *operatorExpression)
{
const Oid hashResultTypeId = INT4OID;
TypeCacheEntry *hashResultTypeEntry = NULL;
Oid operatorId = InvalidOid;
OpExpr *hashedExpression = NULL;
Var *hashedColumn = NULL;
Datum hashedValue = 0;
Const *hashedConstant = NULL;
FmgrInfo *hashFunction = NULL;
TypeCacheEntry *typeEntry = NULL;
Node *leftOperand = get_leftop((Expr *) operatorExpression);
Node *rightOperand = get_rightop((Expr *) operatorExpression);
Const *constant = NULL;
if (IsA(rightOperand, Const))
{
constant = (Const *) rightOperand;
}
else
{
constant = (Const *) leftOperand;
}
/* Load the operator from type cache */
hashResultTypeEntry = lookup_type_cache(hashResultTypeId, TYPECACHE_EQ_OPR);
operatorId = hashResultTypeEntry->eq_opr;
/* Get a column with int4 type */
hashedColumn = MakeInt4Column();
/* Load the hash function from type cache */
typeEntry = lookup_type_cache(constant->consttype, TYPECACHE_HASH_PROC_FINFO);
hashFunction = &(typeEntry->hash_proc_finfo);
if (!OidIsValid(hashFunction->fn_oid))
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("could not identify a hash function for type %s",
format_type_be(constant->consttype)),
errdatatype(constant->consttype)));
}
/*
* Note that any changes to PostgreSQL's hashing functions will change the
* new value created by this function.
*/
hashedValue = FunctionCall1(hashFunction, constant->constvalue);
hashedConstant = MakeInt4Constant(hashedValue);
/* Now create the expression with modified partition column and hashed constant */
hashedExpression = (OpExpr *) make_opclause(operatorId,
InvalidOid, /* no result type yet */
false, /* no return set */
(Expr *) hashedColumn,
(Expr *) hashedConstant,
InvalidOid, InvalidOid);
/* Set implementing function id and result type */
hashedExpression->opfuncid = get_opcode(operatorId);
hashedExpression->opresulttype = get_func_rettype(hashedExpression->opfuncid);
return hashedExpression;
}
/*
* MakeInt4Column creates a column of int4 type with invalid table id and max
* attribute number.
@ -3139,30 +2872,6 @@ MakeInt4Constant(Datum constantValue)
}
/*
* BuildRestrictInfoList builds restrict info list using the selection criteria,
* and then return this list. Note that this function assumes there is only one
* relation for now.
*/
static List *
BuildRestrictInfoList(List *qualList)
{
List *restrictInfoList = NIL;
ListCell *qualCell = NULL;
foreach(qualCell, qualList)
{
RestrictInfo *restrictInfo = NULL;
Node *qualNode = (Node *) lfirst(qualCell);
restrictInfo = make_simple_restrictinfo((Expr *) qualNode);
restrictInfoList = lappend(restrictInfoList, restrictInfo);
}
return restrictInfoList;
}
/* Updates the base constraint with the given min/max values. */
void
UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval)

View File

@ -41,6 +41,7 @@
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_pruning.h"
#include "executor/execdesc.h"
#include "lib/stringinfo.h"
#include "nodes/makefuncs.h"
@ -558,6 +559,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
*
* The function errors out if the given shard interval does not belong to a hash,
* range and append distributed tables.
*
* NB: If you update this, also look at PrunableExpressionsWalker().
*/
static List *
ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex)
@ -1998,9 +2001,7 @@ FindShardForInsert(Query *query, DeferredErrorMessage **planningError)
restrictClauseList = list_make1(equalityExpr);
shardIntervalList = LoadShardIntervalList(distributedTableId);
prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList,
shardIntervalList);
prunedShardList = PruneShards(distributedTableId, tableId, restrictClauseList);
}
prunedShardCount = list_length(prunedShardList);
@ -2060,7 +2061,6 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
char partitionMethod = cacheEntry->partitionMethod;
CmdType commandType = query->commandType;
List *shardIntervalList = NIL;
List *restrictClauseList = NIL;
Index tableId = 1;
List *prunedShardList = NIL;
@ -2068,11 +2068,8 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
Assert(commandType == CMD_UPDATE || commandType == CMD_DELETE);
shardIntervalList = LoadShardIntervalList(distributedTableId);
restrictClauseList = QueryRestrictList(query, partitionMethod);
prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList,
shardIntervalList);
prunedShardList = PruneShards(distributedTableId, tableId, restrictClauseList);
prunedShardCount = list_length(prunedShardList);
if (prunedShardCount != 1)
@ -2412,7 +2409,6 @@ TargetShardIntervalsForSelect(Query *query,
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
List *prunedShardList = NIL;
int shardIndex = 0;
List *joinInfoList = relationRestriction->relOptInfo->joininfo;
List *pseudoRestrictionList = extract_actual_clauses(joinInfoList, true);
bool whereFalseQuery = false;
@ -2428,18 +2424,8 @@ TargetShardIntervalsForSelect(Query *query,
whereFalseQuery = ContainsFalseClause(pseudoRestrictionList);
if (!whereFalseQuery && shardCount > 0)
{
List *shardIntervalList = NIL;
for (shardIndex = 0; shardIndex < shardCount; shardIndex++)
{
ShardInterval *shardInterval =
cacheEntry->sortedShardIntervalArray[shardIndex];
shardIntervalList = lappend(shardIntervalList, shardInterval);
}
prunedShardList = PruneShardList(relationId, tableId,
restrictClauseList,
shardIntervalList);
prunedShardList = PruneShards(relationId, tableId,
restrictClauseList);
/*
* Quick bail out. The query can not be router plannable if one

File diff suppressed because it is too large Load Diff

View File

@ -24,6 +24,7 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/resource_lock.h"
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
#include "distributed/shard_pruning.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "nodes/nodes.h"
@ -203,11 +204,11 @@ PrunedShardIdsForTable(Oid distributedTableId, List *whereClauseList)
Oid shardIdTypeId = INT8OID;
Index tableId = 1;
List *shardList = LoadShardIntervalList(distributedTableId);
List *shardList = NIL;
int shardIdCount = -1;
Datum *shardIdDatumArray = NULL;
shardList = PruneShardList(distributedTableId, tableId, whereClauseList, shardList);
shardList = PruneShards(distributedTableId, tableId, whereClauseList);
shardIdCount = list_length(shardList);
shardIdDatumArray = palloc0(shardIdCount * sizeof(Datum));

View File

@ -134,8 +134,6 @@ static ShardCacheEntry * LookupShardCacheEntry(int64 shardId);
static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void BuildCachedShardList(DistTableCacheEntry *cacheEntry);
static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray,
char partitionMethod);
static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray,
int shardCount,
FmgrInfo *
@ -147,6 +145,9 @@ static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArr
static void ErrorIfInstalledVersionMismatch(void);
static char * AvailableExtensionVersion(void);
static char * InstalledExtensionVersion(void);
static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength,
FmgrInfo *shardIntervalSortCompareFunction);
static void InitializeDistTableCache(void);
static void InitializeWorkerNodeCache(void);
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
@ -158,6 +159,7 @@ static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relation
static List * LookupDistShardTuples(Oid relationId);
static Oid LookupShardRelation(int64 shardId);
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
Oid *columnTypeId, int32 *columnTypeMod,
Oid *intervalTypeId, int32 *intervalTypeMod);
static ShardInterval * TupleToShardInterval(HeapTuple heapTuple,
TupleDesc tupleDescriptor, Oid intervalTypeId,
@ -619,9 +621,21 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
ShardInterval **shardIntervalArray = NULL;
ShardInterval **sortedShardIntervalArray = NULL;
FmgrInfo *shardIntervalCompareFunction = NULL;
FmgrInfo *shardColumnCompareFunction = NULL;
List *distShardTupleList = NIL;
int shardIntervalArrayLength = 0;
int shardIndex = 0;
Oid columnTypeId = InvalidOid;
int32 columnTypeMod = -1;
Oid intervalTypeId = InvalidOid;
int32 intervalTypeMod = -1;
GetPartitionTypeInputInfo(cacheEntry->partitionKeyString,
cacheEntry->partitionMethod,
&columnTypeId,
&columnTypeMod,
&intervalTypeId,
&intervalTypeMod);
distShardTupleList = LookupDistShardTuples(cacheEntry->relationId);
shardIntervalArrayLength = list_length(distShardTupleList);
@ -631,13 +645,6 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation);
ListCell *distShardTupleCell = NULL;
int arrayIndex = 0;
Oid intervalTypeId = InvalidOid;
int32 intervalTypeMod = -1;
GetPartitionTypeInputInfo(cacheEntry->partitionKeyString,
cacheEntry->partitionMethod,
&intervalTypeId,
&intervalTypeMod);
shardIntervalArray = MemoryContextAllocZero(CacheMemoryContext,
shardIntervalArrayLength *
@ -676,29 +683,41 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
heap_close(distShardRelation, AccessShareLock);
}
/* decide and allocate interval comparison function */
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
/* look up value comparison function */
if (columnTypeId != InvalidOid)
{
/* allocate the comparison function in the cache context */
MemoryContext oldContext = MemoryContextSwitchTo(CacheMemoryContext);
shardColumnCompareFunction = GetFunctionInfo(columnTypeId, BTREE_AM_OID,
BTORDER_PROC);
MemoryContextSwitchTo(oldContext);
}
else
{
shardColumnCompareFunction = NULL;
}
/* look up interval comparison function */
if (intervalTypeId != InvalidOid)
{
/* allocate the comparison function in the cache context */
MemoryContext oldContext = MemoryContextSwitchTo(CacheMemoryContext);
shardIntervalCompareFunction = GetFunctionInfo(intervalTypeId, BTREE_AM_OID,
BTORDER_PROC);
MemoryContextSwitchTo(oldContext);
}
else
{
shardIntervalCompareFunction = NULL;
}
else if (shardIntervalArrayLength > 0)
{
MemoryContext oldContext = CurrentMemoryContext;
/* allocate the comparison function in the cache context */
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
shardIntervalCompareFunction =
ShardIntervalCompareFunction(shardIntervalArray,
cacheEntry->partitionMethod);
MemoryContextSwitchTo(oldContext);
}
/* reference tables has a single shard which is not initialized */
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
cacheEntry->hasUninitializedShardInterval = true;
cacheEntry->hasOverlappingShardInterval = true;
/*
* Note that during create_reference_table() call,
@ -727,6 +746,35 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
cacheEntry->hasUninitializedShardInterval =
HasUninitializedShardInterval(sortedShardIntervalArray,
shardIntervalArrayLength);
if (!cacheEntry->hasUninitializedShardInterval)
{
cacheEntry->hasOverlappingShardInterval =
HasOverlappingShardInterval(sortedShardIntervalArray,
shardIntervalArrayLength,
shardIntervalCompareFunction);
}
else
{
cacheEntry->hasOverlappingShardInterval = true;
}
/*
* If table is hash-partitioned and has shards, there never should be
* any uninitalized shards. Historically we've not prevented that for
* range partitioned tables, but it might be a good idea to start
* doing so.
*/
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->hasUninitializedShardInterval)
{
ereport(ERROR, (errmsg("hash partitioned table has uninitialized shards")));
}
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->hasOverlappingShardInterval)
{
ereport(ERROR, (errmsg("hash partitioned table has overlapping shards")));
}
}
@ -794,41 +842,11 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength;
cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray;
cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction;
cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction;
}
/*
* ShardIntervalCompareFunction returns the appropriate compare function for the
* partition column type. In case of hash-partitioning, it always returns the compare
* function for integers. Callers of this function has to ensure that shardIntervalArray
* has at least one element.
*/
static FmgrInfo *
ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, char partitionMethod)
{
FmgrInfo *shardIntervalCompareFunction = NULL;
Oid comparisonTypeId = InvalidOid;
Assert(shardIntervalArray != NULL);
if (partitionMethod == DISTRIBUTE_BY_HASH)
{
comparisonTypeId = INT4OID;
}
else
{
ShardInterval *shardInterval = shardIntervalArray[0];
comparisonTypeId = shardInterval->valueTypeId;
}
shardIntervalCompareFunction = GetFunctionInfo(comparisonTypeId, BTREE_AM_OID,
BTORDER_PROC);
return shardIntervalCompareFunction;
}
/*
* SortedShardIntervalArray sorts the input shardIntervalArray. Shard intervals with
* no min/max values are placed at the end of the array.
@ -932,6 +950,52 @@ HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shar
}
/*
* HasOverlappingShardInterval determines whether the given list of sorted
* shards has overlapping ranges.
*/
static bool
HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength,
FmgrInfo *shardIntervalSortCompareFunction)
{
int shardIndex = 0;
ShardInterval *lastShardInterval = NULL;
Datum comparisonDatum = 0;
int comparisonResult = 0;
/* zero/a single shard can't overlap */
if (shardIntervalArrayLength < 2)
{
return false;
}
lastShardInterval = shardIntervalArray[0];
for (shardIndex = 1; shardIndex < shardIntervalArrayLength; shardIndex++)
{
ShardInterval *curShardInterval = shardIntervalArray[shardIndex];
/* only called if !hasUninitializedShardInterval */
Assert(lastShardInterval->minValueExists && lastShardInterval->maxValueExists);
Assert(curShardInterval->minValueExists && curShardInterval->maxValueExists);
comparisonDatum = CompareCall2(shardIntervalSortCompareFunction,
lastShardInterval->maxValue,
curShardInterval->minValue);
comparisonResult = DatumGetInt32(comparisonDatum);
if (comparisonResult >= 0)
{
return true;
}
lastShardInterval = curShardInterval;
}
return false;
}
/*
* CitusHasBeenLoaded returns true if the citus extension has been created
* in the current database and the extension script has been executed. Otherwise,
@ -2153,6 +2217,7 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
cacheEntry->shardIntervalArrayLength = 0;
cacheEntry->hasUninitializedShardInterval = false;
cacheEntry->hasUniformHashDistribution = false;
cacheEntry->hasOverlappingShardInterval = false;
}
@ -2415,8 +2480,11 @@ LookupShardRelation(int64 shardId)
*/
static void
GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
Oid *columnTypeId, int32 *columnTypeMod,
Oid *intervalTypeId, int32 *intervalTypeMod)
{
*columnTypeId = InvalidOid;
*columnTypeMod = -1;
*intervalTypeId = InvalidOid;
*intervalTypeMod = -1;
@ -2431,18 +2499,25 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
*intervalTypeId = partitionColumn->vartype;
*intervalTypeMod = partitionColumn->vartypmod;
*columnTypeId = partitionColumn->vartype;
*columnTypeMod = partitionColumn->vartypmod;
break;
}
case DISTRIBUTE_BY_HASH:
{
Node *partitionNode = stringToNode(partitionKeyString);
Var *partitionColumn = (Var *) partitionNode;
Assert(IsA(partitionNode, Var));
*intervalTypeId = INT4OID;
*columnTypeId = partitionColumn->vartype;
*columnTypeMod = partitionColumn->vartypmod;
break;
}
case DISTRIBUTE_BY_NONE:
{
*intervalTypeId = InvalidOid;
break;
}

View File

@ -16,6 +16,7 @@
#include "catalog/pg_type.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_planner.h"
#include "distributed/shard_pruning.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/worker_protocol.h"
@ -23,7 +24,6 @@
#include "utils/memutils.h"
static int FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry);
static int SearchCachedShardInterval(Datum partitionColumnValue,
ShardInterval **shardIntervalCache,
int shardCount, FmgrInfo *compareFunction);
@ -247,13 +247,14 @@ FindShardInterval(Datum partitionColumnValue, DistTableCacheEntry *cacheEntry)
* the searched value. Note that the searched value must be the hashed value
* of the original value if the distribution method is hash.
*
* Note that, if the searched value can not be found for hash partitioned tables,
* we error out. This should only happen if something is terribly wrong, either
* metadata tables are corrupted or we have a bug somewhere. Such as a hash
* function which returns a value not in the range of [INT32_MIN, INT32_MAX] can
* fire this.
* Note that, if the searched value can not be found for hash partitioned
* tables, we error out (unless there are no shards, in which case
* INVALID_SHARD_INDEX is returned). This should only happen if something is
* terribly wrong, either metadata tables are corrupted or we have a bug
* somewhere. Such as a hash function which returns a value not in the range
* of [INT32_MIN, INT32_MAX] can fire this.
*/
static int
int
FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry)
{
ShardInterval **shardIntervalCache = cacheEntry->sortedShardIntervalArray;
@ -264,6 +265,11 @@ FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry)
!cacheEntry->hasUniformHashDistribution);
int shardIndex = INVALID_SHARD_INDEX;
if (shardCount == 0)
{
return INVALID_SHARD_INDEX;
}
if (partitionMethod == DISTRIBUTE_BY_HASH)
{
if (useBinarySearch)

View File

@ -38,6 +38,7 @@ typedef struct
bool isDistributedTable;
bool hasUninitializedShardInterval;
bool hasUniformHashDistribution; /* valid for hash partitioned tables */
bool hasOverlappingShardInterval;
/* pg_dist_partition metadata for this table */
char *partitionKeyString;
@ -49,7 +50,15 @@ typedef struct
int shardIntervalArrayLength;
ShardInterval **sortedShardIntervalArray;
FmgrInfo *shardIntervalCompareFunction; /* NULL if no shard intervals exist */
/* comparator for partition column's type, NULL if DISTRIBUTE_BY_NONE */
FmgrInfo *shardColumnCompareFunction;
/*
* Comparator for partition interval type (different from
* shardValueCompareFunction if hash-partitioned), NULL if
* DISTRIBUTE_BY_NONE.
*/
FmgrInfo *shardIntervalCompareFunction;
FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */
/* pg_dist_shard_placement metadata */

View File

@ -253,10 +253,6 @@ extern StringInfo ShardFetchQueryString(uint64 shardId);
extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString);
/* Function declarations for shard pruning */
extern List * PruneShardList(Oid relationId, Index tableId, List *whereClauseList,
List *shardList);
extern bool ContainsFalseClause(List *whereClauseList);
extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
/*

View File

@ -0,0 +1,23 @@
/*-------------------------------------------------------------------------
*
* shard_pruning.h
* Shard pruning infrastructure.
*
* Copyright (c) 2014-2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef SHARD_PRUNING_H_
#define SHARD_PRUNING_H_
#include "distributed/metadata_cache.h"
#include "nodes/primnodes.h"
#define INVALID_SHARD_INDEX -1
/* Function declarations for shard pruning */
extern List * PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList);
extern bool ContainsFalseClause(List *whereClauseList);
#endif /* SHARD_PRUNING_H_ */

View File

@ -35,6 +35,7 @@ extern int CompareRelationShards(const void *leftElement,
extern int ShardIndex(ShardInterval *shardInterval);
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
DistTableCacheEntry *cacheEntry);
extern int FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry);
extern bool SingleReplicatedTable(Oid relationId);
#endif /* SHARDINTERVAL_UTILS_H_ */

View File

@ -10,8 +10,13 @@ CREATE TYPE test_composite_type AS (
);
-- ... as well as a function to use as its comparator...
CREATE FUNCTION equal_test_composite_type_function(test_composite_type, test_composite_type) RETURNS boolean
AS 'select $1.i = $2.i AND $1.i2 = $2.i2;'
LANGUAGE SQL
LANGUAGE 'internal'
AS 'record_eq'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION cmp_test_composite_type_function(test_composite_type, test_composite_type) RETURNS int
LANGUAGE 'internal'
AS 'btrecordcmp'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- ... use that function to create a custom equality operator...
@ -34,7 +39,8 @@ RETURNS NULL ON NULL INPUT;
-- One uses BTREE the other uses HASH
CREATE OPERATOR CLASS cats_op_fam_clas3
DEFAULT FOR TYPE test_composite_type USING BTREE AS
OPERATOR 3 = (test_composite_type, test_composite_type);
OPERATOR 3 = (test_composite_type, test_composite_type),
FUNCTION 1 cmp_test_composite_type_function(test_composite_type, test_composite_type);
CREATE OPERATOR CLASS cats_op_fam_class
DEFAULT FOR TYPE test_composite_type USING HASH AS
OPERATOR 1 = (test_composite_type, test_composite_type),

View File

@ -69,21 +69,21 @@ SELECT prune_using_single_value('pruning', NULL);
SELECT prune_using_either_value('pruning', 'tomato', 'petunia');
prune_using_either_value
--------------------------
{800001,800002}
{800002,800001}
(1 row)
-- an AND clause with incompatible values returns no shards
-- an AND clause with values on different shards returns no shards
SELECT prune_using_both_values('pruning', 'tomato', 'petunia');
prune_using_both_values
-------------------------
{}
(1 row)
-- but if both values are on the same shard, should get back that shard
-- even if both values are on the same shard, a value can't be equal to two others
SELECT prune_using_both_values('pruning', 'tomato', 'rose');
prune_using_both_values
-------------------------
{800002}
{}
(1 row)
-- unit test of the equality expression generation code

View File

@ -166,7 +166,7 @@ FROM
-- load some more data
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
-- Update shards so that they do not have 1-1 matching. We should error here.
-- Update shards so that they do not have 1-1 matching, triggering an error.
UPDATE pg_dist_shard SET shardminvalue = '2147483646' WHERE shardid = 1260006;
UPDATE pg_dist_shard SET shardmaxvalue = '2147483647' WHERE shardid = 1260006;
SELECT

View File

@ -228,15 +228,14 @@ LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_
-- load some more data
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
-- Update shards so that they do not have 1-1 matching. We should error here.
-- Update shards so that they do not have 1-1 matching, triggering an error.
UPDATE pg_dist_shard SET shardminvalue = '2147483646' WHERE shardid = 1260006;
UPDATE pg_dist_shard SET shardmaxvalue = '2147483647' WHERE shardid = 1260006;
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_hash b ON (l_custkey = r_custkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Shards of relations in outer join queries must have 1-to-1 shard partitioning
ERROR: hash partitioned table has overlapping shards
UPDATE pg_dist_shard SET shardminvalue = '-2147483648' WHERE shardid = 1260006;
UPDATE pg_dist_shard SET shardmaxvalue = '-1073741825' WHERE shardid = 1260006;
-- empty tables

View File

@ -15,8 +15,14 @@ CREATE TYPE test_composite_type AS (
-- ... as well as a function to use as its comparator...
CREATE FUNCTION equal_test_composite_type_function(test_composite_type, test_composite_type) RETURNS boolean
AS 'select $1.i = $2.i AND $1.i2 = $2.i2;'
LANGUAGE SQL
LANGUAGE 'internal'
AS 'record_eq'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION cmp_test_composite_type_function(test_composite_type, test_composite_type) RETURNS int
LANGUAGE 'internal'
AS 'btrecordcmp'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
@ -44,7 +50,8 @@ RETURNS NULL ON NULL INPUT;
-- One uses BTREE the other uses HASH
CREATE OPERATOR CLASS cats_op_fam_clas3
DEFAULT FOR TYPE test_composite_type USING BTREE AS
OPERATOR 3 = (test_composite_type, test_composite_type);
OPERATOR 3 = (test_composite_type, test_composite_type),
FUNCTION 1 cmp_test_composite_type_function(test_composite_type, test_composite_type);
CREATE OPERATOR CLASS cats_op_fam_class
DEFAULT FOR TYPE test_composite_type USING HASH AS

View File

@ -59,10 +59,10 @@ SELECT prune_using_single_value('pruning', NULL);
-- build an OR clause and expect more than one sahrd
SELECT prune_using_either_value('pruning', 'tomato', 'petunia');
-- an AND clause with incompatible values returns no shards
-- an AND clause with values on different shards returns no shards
SELECT prune_using_both_values('pruning', 'tomato', 'petunia');
-- but if both values are on the same shard, should get back that shard
-- even if both values are on the same shard, a value can't be equal to two others
SELECT prune_using_both_values('pruning', 'tomato', 'rose');
-- unit test of the equality expression generation code