mirror of https://github.com/citusdata/citus.git
6x improvement in planning speed with very ~large tables
parent
fbc2ca7468
commit
509ffbfc54
|
@ -139,7 +139,7 @@ static bool JoinOnPartitionColumn(Query *query);
|
|||
static void ErrorIfUnsupportedShardDistribution(Query *query);
|
||||
static List * RelationIdList(Query *query);
|
||||
static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
|
||||
static bool ShardIntervalsEqual(ShardInterval *firstInterval,
|
||||
static bool ShardIntervalsEqual(FmgrInfo* comparisonFunction, ShardInterval *firstInterval,
|
||||
ShardInterval *secondInterval);
|
||||
static void ErrorIfUnsupportedFilters(Query *subquery);
|
||||
static bool EqualOpExpressionLists(List *firstOpExpressionList,
|
||||
|
@ -3483,8 +3483,11 @@ RelationIdList(Query *query)
|
|||
static bool
|
||||
CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
||||
{
|
||||
FmgrInfo *comparisonFunction = NULL;
|
||||
bool coPartitionedTables = true;
|
||||
uint32 intervalIndex = 0;
|
||||
Oid typeId = InvalidOid;
|
||||
|
||||
DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId);
|
||||
DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId);
|
||||
ShardInterval *sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray;
|
||||
|
@ -3503,12 +3506,17 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
|||
return true;
|
||||
}
|
||||
|
||||
typeId = sortedFirstIntervalArray[0].valueTypeId;
|
||||
comparisonFunction = GetFunctionInfo(typeId, BTREE_AM_OID, BTORDER_PROC);
|
||||
|
||||
for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++)
|
||||
{
|
||||
ShardInterval *firstInterval = &sortedFirstIntervalArray[intervalIndex];
|
||||
ShardInterval *secondInterval = &sortedSecondIntervalArray[intervalIndex];
|
||||
|
||||
bool shardIntervalsEqual = ShardIntervalsEqual(firstInterval, secondInterval);
|
||||
bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction,
|
||||
firstInterval,
|
||||
secondInterval);
|
||||
if (!shardIntervalsEqual)
|
||||
{
|
||||
coPartitionedTables = false;
|
||||
|
@ -3524,19 +3532,15 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
|||
* ShardIntervalsEqual checks if given shard intervals have equal min/max values.
|
||||
*/
|
||||
static bool
|
||||
ShardIntervalsEqual(ShardInterval *firstInterval, ShardInterval *secondInterval)
|
||||
ShardIntervalsEqual(FmgrInfo* comparisonFunction, ShardInterval *firstInterval,
|
||||
ShardInterval *secondInterval)
|
||||
{
|
||||
Oid typeId = InvalidOid;
|
||||
FmgrInfo *comparisonFunction = NULL;
|
||||
bool shardIntervalsEqual = false;
|
||||
Datum firstMin = 0;
|
||||
Datum firstMax = 0;
|
||||
Datum secondMin = 0;
|
||||
Datum secondMax = 0;
|
||||
|
||||
typeId = firstInterval->valueTypeId;
|
||||
comparisonFunction = GetFunctionInfo(typeId, BTREE_AM_OID, BTORDER_PROC);
|
||||
|
||||
firstMin = firstInterval->minValue;
|
||||
firstMax = firstInterval->maxValue;
|
||||
secondMin = secondInterval->minValue;
|
||||
|
|
|
@ -2003,6 +2003,7 @@ SubquerySqlTaskList(Job *job)
|
|||
uint32 anchorRangeTableId = 0;
|
||||
uint32 rangeTableIndex = 0;
|
||||
const uint32 fragmentSize = sizeof(RangeTableFragment);
|
||||
uint64 maxTableSize = 0;
|
||||
|
||||
/* find filters on partition columns */
|
||||
ExtractQueryWalker((Node *) subquery, &queryList);
|
||||
|
@ -2026,7 +2027,6 @@ SubquerySqlTaskList(Job *job)
|
|||
|
||||
/* get list of all range tables in subquery tree */
|
||||
ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList);
|
||||
anchorRangeTableId = AnchorRangeTableId(rangeTableList);
|
||||
|
||||
/*
|
||||
* For each range table entry, first we prune shards for the relation
|
||||
|
@ -2045,6 +2045,7 @@ SubquerySqlTaskList(Job *job)
|
|||
uint32 tableId = rangeTableIndex + 1; /* tableId starts from 1 */
|
||||
uint32 finalShardCount = 0;
|
||||
uint32 shardIndex = 0;
|
||||
uint64 tableSize = 0;
|
||||
|
||||
if (opExpressionList != NIL)
|
||||
{
|
||||
|
@ -2073,6 +2074,8 @@ SubquerySqlTaskList(Job *job)
|
|||
{
|
||||
ShardInterval *shardInterval = sortedIntervalArray[shardIndex];
|
||||
|
||||
tableSize += ShardLength(shardInterval->shardId);
|
||||
|
||||
RangeTableFragment *shardFragment = palloc0(fragmentSize);
|
||||
shardFragment->fragmentReference = &(shardInterval->shardId);
|
||||
shardFragment->fragmentType = CITUS_RTE_RELATION;
|
||||
|
@ -2094,6 +2097,11 @@ SubquerySqlTaskList(Job *job)
|
|||
}
|
||||
}
|
||||
|
||||
if (anchorRangeTableId == 0 || tableSize > maxTableSize){
|
||||
maxTableSize = tableSize;
|
||||
anchorRangeTableId = tableId;
|
||||
}
|
||||
|
||||
rangeTableIndex++;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue