mirror of https://github.com/citusdata/citus.git
Check equivalence on reference tables for subquery pushdown
parent
a6d40b8bc5
commit
45717dd013
|
@ -67,7 +67,6 @@ static Var * FindTranslatedVar(List *appendRelList, Oid relationOid,
|
|||
static bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
|
||||
RelationRestrictionContext *
|
||||
restrictionContext);
|
||||
static uint32 ReferenceRelationCount(RelationRestrictionContext *restrictionContext);
|
||||
static List * GenerateAttributeEquivalencesForRelationRestrictions(
|
||||
RelationRestrictionContext *restrictionContext);
|
||||
static AttributeEquivalenceClass * AttributeEquivalenceClassForEquivalenceClass(
|
||||
|
@ -314,10 +313,9 @@ FindTranslatedVar(List *appendRelList, Oid relationOid, Index relationRteIndex,
|
|||
* joined on their partition keys.
|
||||
*
|
||||
* The function returns true if all relations are joined on their partition keys.
|
||||
* Otherwise, the function returns false. Since reference tables do not have partition
|
||||
* keys, we skip processing them. Also, if the query includes only a single non-reference
|
||||
* distributed relation, the function returns true since it doesn't make sense to check
|
||||
* for partition key equality in that case.
|
||||
* Otherwise, the function returns false. In order to support reference tables
|
||||
* with subqueries, equality between attributes of reference tables and partition
|
||||
* key of distributed tables are also considered.
|
||||
*
|
||||
* In order to do that, we invented a new equivalence class namely:
|
||||
* AttributeEquivalenceClass. In very simple words, a AttributeEquivalenceClass is
|
||||
|
@ -350,24 +348,15 @@ RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *
|
|||
List *relationRestrictionAttributeEquivalenceList = NIL;
|
||||
List *joinRestrictionAttributeEquivalenceList = NIL;
|
||||
List *allAttributeEquivalenceList = NIL;
|
||||
uint32 referenceRelationCount = ReferenceRelationCount(restrictionContext);
|
||||
|
||||
uint32 totalRelationCount = list_length(restrictionContext->relationRestrictionList);
|
||||
uint32 nonReferenceRelationCount = totalRelationCount - referenceRelationCount;
|
||||
|
||||
/*
|
||||
* If the query includes a single relation which is not a reference table,
|
||||
* we should not check the partition column equality.
|
||||
* Consider two example cases:
|
||||
* (i) The query includes only a single colocated relation
|
||||
* (ii) A colocated relation is joined with a (or multiple) reference
|
||||
* table(s) where colocated relation is not joined on the partition key
|
||||
*
|
||||
* For the above two cases, we don't need to execute the partition column equality
|
||||
* algorithm. The reason is that the essence of this function is to ensure that the
|
||||
* tasks that are going to be created should not need data from other tasks. In both
|
||||
* cases mentioned above, the necessary data per task would be on available.
|
||||
* If the query includes only one relation, we should not check the partition
|
||||
* column equality. Single table should not need to fetch data from other nodes
|
||||
* except it's own node(s).
|
||||
*/
|
||||
if (nonReferenceRelationCount <= 1)
|
||||
if (totalRelationCount == 1)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
@ -429,8 +418,7 @@ EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
|
|||
(RelationRestriction *) lfirst(relationRestrictionCell);
|
||||
int rteIdentity = GetRTEIdentity(relationRestriction->rte);
|
||||
|
||||
if (DistPartitionKey(relationRestriction->relationId) &&
|
||||
!bms_is_member(rteIdentity, commonRteIdentities))
|
||||
if (!bms_is_member(rteIdentity, commonRteIdentities))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -440,31 +428,6 @@ EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReferenceRelationCount iterates over the relations and returns the reference table
|
||||
* relation count.
|
||||
*/
|
||||
static uint32
|
||||
ReferenceRelationCount(RelationRestrictionContext *restrictionContext)
|
||||
{
|
||||
ListCell *relationRestrictionCell = NULL;
|
||||
uint32 referenceRelationCount = 0;
|
||||
|
||||
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
|
||||
{
|
||||
RelationRestriction *relationRestriction =
|
||||
(RelationRestriction *) lfirst(relationRestrictionCell);
|
||||
|
||||
if (PartitionMethod(relationRestriction->relationId) == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
referenceRelationCount++;
|
||||
}
|
||||
}
|
||||
|
||||
return referenceRelationCount;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateAttributeEquivalencesForRelationRestrictions gets a relation restriction
|
||||
* context and returns a list of AttributeEquivalenceClass.
|
||||
|
@ -642,7 +605,7 @@ GetVarFromAssignedParam(List *parentPlannerParamList, Param *plannerParam)
|
|||
|
||||
/*
|
||||
* GenerateCommonEquivalence gets a list of unrelated AttributeEquiavalanceClass
|
||||
* whose all members are partition keys.
|
||||
* whose all members are partition keys or a column of reference table.
|
||||
*
|
||||
* With the equivalence classes, the function follows the algorithm
|
||||
* outlined below:
|
||||
|
@ -1092,9 +1055,6 @@ AddUnionSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass **
|
|||
* class using the rteIdentity provided by the rangeTableEntry. Note that
|
||||
* rteIdentities are only assigned to RTE_RELATIONs and this function asserts
|
||||
* the input rte to be an RTE_RELATION.
|
||||
*
|
||||
* Note that this function only adds partition keys to the attributeEquivalanceClass.
|
||||
* This implies that there wouldn't be any columns for reference tables.
|
||||
*/
|
||||
static void
|
||||
AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass **
|
||||
|
@ -1103,19 +1063,13 @@ AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass **
|
|||
Var *varToBeAdded)
|
||||
{
|
||||
AttributeEquivalenceClassMember *attributeEqMember = NULL;
|
||||
Oid relationId = InvalidOid;
|
||||
Var *relationPartitionKey = NULL;
|
||||
Oid relationId = rangeTableEntry->relid;
|
||||
Var *relationPartitionKey = DistPartitionKey(relationId);
|
||||
|
||||
Assert(rangeTableEntry->rtekind == RTE_RELATION);
|
||||
|
||||
relationId = rangeTableEntry->relid;
|
||||
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
relationPartitionKey = DistPartitionKey(relationId);
|
||||
if (relationPartitionKey->varattno != varToBeAdded->varattno)
|
||||
if (PartitionMethod(relationId) != DISTRIBUTE_BY_NONE &&
|
||||
relationPartitionKey->varattno != varToBeAdded->varattno)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -1133,12 +1133,8 @@ FROM
|
|||
WHERE
|
||||
colocated_table_test_2.value_4 = reference_table_test.value_4
|
||||
RETURNING value_1, value_2;
|
||||
value_1 | value_2
|
||||
---------+---------
|
||||
1 | 1
|
||||
2 | 2
|
||||
(2 rows)
|
||||
|
||||
ERROR: cannot perform distributed planning for the given modification
|
||||
DETAIL: Select query cannot be pushed down to the worker.
|
||||
-- some more complex queries (Note that there are more complex queries in multi_insert_select.sql)
|
||||
INSERT INTO
|
||||
colocated_table_test (value_1, value_2)
|
||||
|
@ -1149,12 +1145,8 @@ FROM
|
|||
WHERE
|
||||
colocated_table_test_2.value_2 = reference_table_test.value_2
|
||||
RETURNING value_1, value_2;
|
||||
value_1 | value_2
|
||||
---------+---------
|
||||
1 | 1
|
||||
2 | 2
|
||||
(2 rows)
|
||||
|
||||
ERROR: cannot perform distributed planning for the given modification
|
||||
DETAIL: Select query cannot be pushed down to the worker.
|
||||
-- partition column value comes from reference table, goes via coordinator
|
||||
INSERT INTO
|
||||
colocated_table_test (value_1, value_2)
|
||||
|
@ -1615,7 +1607,7 @@ INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
|||
SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test');
|
||||
master_modify_multiple_shards
|
||||
-------------------------------
|
||||
10
|
||||
6
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
|
|
Loading…
Reference in New Issue