mirror of https://github.com/citusdata/citus.git
Merge pull request #608 from citusdata/fix_pruning_performance
Fix join pruning performancepull/1938/head
commit
260a1045e5
|
@ -2008,7 +2008,7 @@ SubquerySqlTaskList(Job *job)
|
|||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
|
||||
RangeTableFragment *shardFragment = palloc0(fragmentSize);
|
||||
shardFragment->fragmentReference = &(shardInterval->shardId);
|
||||
shardFragment->fragmentReference = shardInterval;
|
||||
shardFragment->fragmentType = CITUS_RTE_RELATION;
|
||||
shardFragment->rangeTableId = tableId;
|
||||
|
||||
|
@ -2449,7 +2449,7 @@ RangeTableFragmentsList(List *rangeTableList, List *whereClauseList,
|
|||
(ShardInterval *) lfirst(shardIntervalCell);
|
||||
|
||||
RangeTableFragment *shardFragment = palloc0(fragmentSize);
|
||||
shardFragment->fragmentReference = &shardInterval->shardId;
|
||||
shardFragment->fragmentReference = shardInterval;
|
||||
shardFragment->fragmentType = CITUS_RTE_RELATION;
|
||||
shardFragment->rangeTableId = tableId;
|
||||
|
||||
|
@ -3582,14 +3582,16 @@ FragmentInterval(RangeTableFragment *fragment)
|
|||
ShardInterval *fragmentInterval = NULL;
|
||||
if (fragment->fragmentType == CITUS_RTE_RELATION)
|
||||
{
|
||||
uint64 *shardId = (uint64 *) fragment->fragmentReference;
|
||||
|
||||
fragmentInterval = LoadShardInterval(*shardId);
|
||||
Assert(CitusIsA(fragment->fragmentReference, ShardInterval));
|
||||
fragmentInterval = (ShardInterval *) fragment->fragmentReference;
|
||||
}
|
||||
else if (fragment->fragmentType == CITUS_RTE_REMOTE_QUERY)
|
||||
{
|
||||
Task *mergeTask = (Task *) fragment->fragmentReference;
|
||||
Task *mergeTask = NULL;
|
||||
|
||||
Assert(CitusIsA(fragment->fragmentReference, Task));
|
||||
|
||||
mergeTask = (Task *) fragment->fragmentReference;
|
||||
fragmentInterval = mergeTask->shardInterval;
|
||||
}
|
||||
|
||||
|
@ -3601,17 +3603,16 @@ FragmentInterval(RangeTableFragment *fragment)
|
|||
bool
|
||||
ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval)
|
||||
{
|
||||
Oid typeId = InvalidOid;
|
||||
FmgrInfo *comparisonFunction = NULL;
|
||||
bool nonOverlap = false;
|
||||
DistTableCacheEntry *intervalRelation =
|
||||
DistributedTableCacheEntry(firstInterval->relationId);
|
||||
FmgrInfo *comparisonFunction = intervalRelation->shardIntervalCompareFunction;
|
||||
|
||||
Datum firstMin = 0;
|
||||
Datum firstMax = 0;
|
||||
Datum secondMin = 0;
|
||||
Datum secondMax = 0;
|
||||
|
||||
typeId = firstInterval->valueTypeId;
|
||||
comparisonFunction = GetFunctionInfo(typeId, BTREE_AM_OID, BTORDER_PROC);
|
||||
|
||||
firstMin = firstInterval->minValue;
|
||||
firstMax = firstInterval->maxValue;
|
||||
|
@ -3688,22 +3689,24 @@ UniqueFragmentList(List *fragmentList)
|
|||
|
||||
foreach(fragmentCell, fragmentList)
|
||||
{
|
||||
uint64 *shardId = NULL;
|
||||
ShardInterval *shardInterval = NULL;
|
||||
bool shardIdAlreadyAdded = false;
|
||||
ListCell *uniqueFragmentCell = NULL;
|
||||
|
||||
RangeTableFragment *fragment = (RangeTableFragment *) lfirst(fragmentCell);
|
||||
Assert(fragment->fragmentType == CITUS_RTE_RELATION);
|
||||
|
||||
shardId = fragment->fragmentReference;
|
||||
Assert(CitusIsA(fragment->fragmentReference, ShardInterval));
|
||||
shardInterval = (ShardInterval *) fragment->fragmentReference;
|
||||
|
||||
foreach(uniqueFragmentCell, uniqueFragmentList)
|
||||
{
|
||||
RangeTableFragment *uniqueFragment =
|
||||
(RangeTableFragment *) lfirst(uniqueFragmentCell);
|
||||
uint64 *uniqueShardId = uniqueFragment->fragmentReference;
|
||||
ShardInterval *uniqueShardInterval =
|
||||
(ShardInterval *) uniqueFragment->fragmentReference;
|
||||
|
||||
if (*shardId == *uniqueShardId)
|
||||
if (shardInterval->shardId == uniqueShardInterval->shardId)
|
||||
{
|
||||
shardIdAlreadyAdded = true;
|
||||
break;
|
||||
|
@ -3735,12 +3738,13 @@ DataFetchTaskList(uint64 jobId, uint32 taskIdIndex, List *fragmentList)
|
|||
RangeTableFragment *fragment = (RangeTableFragment *) lfirst(fragmentCell);
|
||||
if (fragment->fragmentType == CITUS_RTE_RELATION)
|
||||
{
|
||||
uint64 *shardId = fragment->fragmentReference;
|
||||
StringInfo shardFetchQueryString = ShardFetchQueryString(*shardId);
|
||||
ShardInterval *shardInterval = fragment->fragmentReference;
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
StringInfo shardFetchQueryString = ShardFetchQueryString(shardId);
|
||||
|
||||
Task *shardFetchTask = CreateBasicTask(jobId, taskIdIndex, SHARD_FETCH_TASK,
|
||||
shardFetchQueryString->data);
|
||||
shardFetchTask->shardId = (*shardId);
|
||||
shardFetchTask->shardId = shardId;
|
||||
|
||||
dataFetchTaskList = lappend(dataFetchTaskList, shardFetchTask);
|
||||
taskIdIndex++;
|
||||
|
@ -3970,7 +3974,8 @@ FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment)
|
|||
if (fragmentType == CITUS_RTE_RELATION)
|
||||
{
|
||||
char *shardAliasName = NULL;
|
||||
uint64 *shardId = fragment->fragmentReference;
|
||||
ShardInterval *shardInterval = (ShardInterval *) fragment->fragmentReference;
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
|
||||
Oid relationId = rangeTableEntry->relid;
|
||||
char *relationName = get_rel_name(relationId);
|
||||
|
@ -3992,7 +3997,7 @@ FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment)
|
|||
* If user specified a shard name in pg_dist_shard, use that name in alias.
|
||||
* Otherwise, set shard name in alias to <relation_name>_<shard_id>.
|
||||
*/
|
||||
shardAliasName = LoadShardAlias(relationId, *shardId);
|
||||
shardAliasName = LoadShardAlias(relationId, shardId);
|
||||
if (shardAliasName != NULL)
|
||||
{
|
||||
fragmentName = shardAliasName;
|
||||
|
@ -4000,7 +4005,7 @@ FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment)
|
|||
else
|
||||
{
|
||||
char *shardName = pstrdup(relationName);
|
||||
AppendShardIdToName(&shardName, *shardId);
|
||||
AppendShardIdToName(&shardName, shardId);
|
||||
|
||||
fragmentName = shardName;
|
||||
}
|
||||
|
@ -4058,11 +4063,13 @@ AnchorShardId(List *fragmentList, uint32 anchorRangeTableId)
|
|||
RangeTableFragment *fragment = (RangeTableFragment *) lfirst(fragmentCell);
|
||||
if (fragment->rangeTableId == anchorRangeTableId)
|
||||
{
|
||||
uint64 *shardIdPointer = NULL;
|
||||
Assert(fragment->fragmentType == CITUS_RTE_RELATION);
|
||||
ShardInterval *shardInterval = NULL;
|
||||
|
||||
shardIdPointer = (uint64 *) fragment->fragmentReference;
|
||||
anchorShardId = (*shardIdPointer);
|
||||
Assert(fragment->fragmentType == CITUS_RTE_RELATION);
|
||||
Assert(CitusIsA(fragment->fragmentReference, ShardInterval));
|
||||
|
||||
shardInterval = (ShardInterval *) fragment->fragmentReference;
|
||||
anchorShardId = shardInterval->shardId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue