RestrictionEquivalenceForPartitionKeys

multi-column-distribution
Jelte Fennema 2021-06-09 18:06:31 +02:00
parent 5f748eeea0
commit 35061882b0
4 changed files with 280 additions and 77 deletions

View File

@ -31,8 +31,12 @@
- [ ] PostprocessAlterTableStmtAttachPartition()
- [x] TargetListOnPartitionColumn()
- [ ] PartitionColumnForPushedDownSubquery()
- [ ] RestrictionEquivalenceForPartitionKeys()
# query pushdown planner
- [x] RestrictionEquivalenceForPartitionKeys()
- [x] EquivalenceListContainsRelationsEquality()
- [ ] SafeToPushdownUnionSubquery()
- [ ] RelationRestrictionPartitionKeyIndex()
- [ ] PartitionMethod()

View File

@ -126,18 +126,19 @@ static bool AttributeClassContainsAttributeClassMember(AttributeEquivalenceClass
static List * AddAttributeClassToAttributeClassList(List *attributeEquivalenceList,
AttributeEquivalenceClass *
attributeEquivalence);
static bool AttributeEquivalencesAreEqual(AttributeEquivalenceClass *
firstAttributeEquivalence,
AttributeEquivalenceClass *
secondAttributeEquivalence);
static AttributeEquivalenceClass * GenerateCommonEquivalence(List *
attributeEquivalenceList,
RelationRestrictionContext *
relationRestrictionContext);
static AttributeEquivalenceClass * GenerateEquivalenceClassForRelationRestriction(
RelationRestrictionContext
*
relationRestrictionContext);
static bool AttributeEquivalencesAreEqual(
AttributeEquivalenceClass *firstAttributeEquivalence,
AttributeEquivalenceClass *
secondAttributeEquivalence);
static AttributeEquivalenceClass * GenerateCommonEquivalence(
List *attributeEquivalenceList,
RelationRestrictionContext *
relationRestrictionContext,
AttributeEquivalenceClass *
firstEquivalenceClass);
static List * GenerateEquivalenceClassesForRelationRestriction(RelationRestrictionContext
*
relationRestrictionContext);
static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass *
firstClass,
AttributeEquivalenceClass *
@ -154,6 +155,7 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn
rangeTableArrayLength, Relids
queryRteIdentities);
static Relids QueryRteIdentities(Query *queryTree);
static bool HasVarattnoInVarList(List *varList, AttrNumber varattno);
#if PG_VERSION_NUM >= PG_VERSION_13
static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo);
@ -653,46 +655,53 @@ bool
EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
RelationRestrictionContext *restrictionContext)
{
ListCell *commonEqClassCell = NULL;
ListCell *relationRestrictionCell = NULL;
Relids commonRteIdentities = NULL;
/*
* In general we're trying to expand existing the equivalence classes to find a
* common equivalence class. The main goal is to test whether this main class
* contains all partition keys of the existing relations.
*/
AttributeEquivalenceClass *commonEquivalenceClass = GenerateCommonEquivalence(
attributeEquivalenceList,
List *firstEquivalenceClassList = GenerateEquivalenceClassesForRelationRestriction(
restrictionContext);
/* add the rte indexes of relations to a bitmap */
foreach(commonEqClassCell, commonEquivalenceClass->equivalentAttributes)
AttributeEquivalenceClass *firstEquivalenceClass = NULL;
foreach_ptr(firstEquivalenceClass, firstEquivalenceClassList)
{
AttributeEquivalenceClassMember *classMember =
(AttributeEquivalenceClassMember *) lfirst(commonEqClassCell);
int rteIdentity = classMember->rteIdentity;
/*
* In general we're trying to expand existing the equivalence classes to find a
* common equivalence class. The main goal is to test whether this main class
* contains all partition keys of the existing relations.
*/
AttributeEquivalenceClass *commonEquivalenceClass = GenerateCommonEquivalence(
attributeEquivalenceList,
restrictionContext,
firstEquivalenceClass);
commonRteIdentities = bms_add_member(commonRteIdentities, rteIdentity);
}
Relids commonRteIdentities = NULL;
/* check whether all relations exists in the main restriction list */
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(relationRestrictionCell);
int rteIdentity = GetRTEIdentity(relationRestriction->rte);
AttributeEquivalenceClassMember *classMember = NULL;
/* we shouldn't check for the equality of non-distributed tables */
if (IsCitusTableType(relationRestriction->relationId,
CITUS_TABLE_WITH_NO_DIST_KEY))
/* add the rte indexes of relations to a bitmap */
foreach_ptr(classMember, commonEquivalenceClass->equivalentAttributes)
{
continue;
int rteIdentity = classMember->rteIdentity;
commonRteIdentities = bms_add_member(commonRteIdentities, rteIdentity);
}
if (!bms_is_member(rteIdentity, commonRteIdentities))
RelationRestriction *relationRestriction = NULL;
/* check whether all relations exists in the main restriction list */
foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList)
{
return false;
int rteIdentity = GetRTEIdentity(relationRestriction->rte);
/* we shouldn't check for the equality of non-distributed tables */
if (IsCitusTableType(relationRestriction->relationId,
CITUS_TABLE_WITH_NO_DIST_KEY))
{
continue;
}
if (!bms_is_member(rteIdentity, commonRteIdentities))
{
return false;
}
}
}
@ -928,7 +937,8 @@ SearchPlannerParamList(List *plannerParamList, Param *plannerParam)
*/
static AttributeEquivalenceClass *
GenerateCommonEquivalence(List *attributeEquivalenceList,
RelationRestrictionContext *relationRestrictionContext)
RelationRestrictionContext *relationRestrictionContext,
AttributeEquivalenceClass *firstEquivalenceClass)
{
Bitmapset *addedEquivalenceIds = NULL;
uint32 equivalenceListSize = list_length(attributeEquivalenceList);
@ -938,14 +948,6 @@ GenerateCommonEquivalence(List *attributeEquivalenceList,
sizeof(AttributeEquivalenceClass));
commonEquivalenceClass->equivalenceId = 0;
/*
* We seed the common equivalence class with a the first distributed
* table since we always want the input distributed relations to be
* on the common class.
*/
AttributeEquivalenceClass *firstEquivalenceClass =
GenerateEquivalenceClassForRelationRestriction(relationRestrictionContext);
/* we skip the calculation if there are not enough information */
if (equivalenceListSize < 1 || firstEquivalenceClass == NULL)
{
@ -989,9 +991,9 @@ GenerateCommonEquivalence(List *attributeEquivalenceList,
ListConcatUniqueAttributeClassMemberLists(commonEquivalenceClass,
currentEquivalenceClass);
addedEquivalenceIds = bms_add_member(addedEquivalenceIds,
currentEquivalenceClass->
equivalenceId);
addedEquivalenceIds = bms_add_member(
addedEquivalenceIds,
currentEquivalenceClass->equivalenceId);
/*
* It seems inefficient to start from the beginning.
@ -1019,27 +1021,38 @@ GenerateCommonEquivalence(List *attributeEquivalenceList,
/*
* GenerateEquivalenceClassForRelationRestriction generates an AttributeEquivalenceClass
* with a single AttributeEquivalenceClassMember.
* GenerateEquivalenceClassesForRelationRestriction generates a list of
* AttributeEquivalenceClasses that all have a single
* AttributeEquivalenceClassMember. The list contains one
* AttributeEquivalenceClass for of each of the distribution columns of the
* first distributed relation in the relationRestrictionContext.
*/
static AttributeEquivalenceClass *
GenerateEquivalenceClassForRelationRestriction(
static List *
GenerateEquivalenceClassesForRelationRestriction(
RelationRestrictionContext *relationRestrictionContext)
{
ListCell *relationRestrictionCell = NULL;
AttributeEquivalenceClassMember *eqMember = NULL;
AttributeEquivalenceClass *eqClassForRelation = NULL;
foreach(relationRestrictionCell, relationRestrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(relationRestrictionCell);
Var *relationPartitionKey = DistPartitionKey(relationRestriction->relationId);
List *relationPartitionKeyList = DistPartitionKeys(
relationRestriction->relationId);
if (relationPartitionKey)
if (list_length(relationPartitionKeyList) == 0)
{
eqClassForRelation = palloc0(sizeof(AttributeEquivalenceClass));
eqMember = palloc0(sizeof(AttributeEquivalenceClassMember));
continue;
}
List *equivalenceClasses = NIL;
Var *relationPartitionKey = NULL;
foreach_ptr(relationPartitionKey, relationPartitionKeyList)
{
AttributeEquivalenceClass *eqClassForRelation = palloc0(
sizeof(AttributeEquivalenceClass));
AttributeEquivalenceClassMember *eqMember = palloc0(
sizeof(AttributeEquivalenceClassMember));
eqMember->relationId = relationRestriction->relationId;
eqMember->rteIdentity = GetRTEIdentity(relationRestriction->rte);
eqMember->varno = relationRestriction->index;
@ -1047,12 +1060,12 @@ GenerateEquivalenceClassForRelationRestriction(
eqClassForRelation->equivalentAttributes =
lappend(eqClassForRelation->equivalentAttributes, eqMember);
break;
equivalenceClasses = lappend(equivalenceClasses, eqClassForRelation);
}
return equivalenceClasses;
}
return eqClassForRelation;
return NIL;
}
@ -1532,18 +1545,12 @@ AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass *
return;
}
Var *relationPartitionKey = DistPartitionKey(relationId);
List *relationPartitionKeyList = DistPartitionKeys(relationId);
Assert(rangeTableEntry->rtekind == RTE_RELATION);
/* we don't need reference tables in the equality on columns */
if (relationPartitionKey == NULL)
{
return;
}
/* we're only interested in distribution columns */
if (relationPartitionKey->varattno != varToBeAdded->varattno)
if (!HasVarattnoInVarList(relationPartitionKeyList, varToBeAdded->varattno))
{
return;
}
@ -1562,6 +1569,21 @@ AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass *
}
static bool
HasVarattnoInVarList(List *varList, AttrNumber varattno)
{
Var *var = NULL;
foreach_ptr(var, varList)
{
if (var->varattno == varattno)
{
return true;
}
}
return false;
}
/*
* AttributeClassContainsAttributeClassMember returns true if it the input class member
* is already exists in the attributeEquivalenceClass. An equality is identified by the

View File

@ -24,12 +24,20 @@ select create_distributed_table('t2', ARRAY['id', 'id2'], colocate_with := 'none
(1 row)
create table t3(id int, id2 int, b int);
select create_distributed_table('t3', ARRAY['id', 'id2'], colocate_with := 't2');
create_distributed_table
---------------------------------------------------------------------
(1 row)
select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%');
logicalrelid | partmethod | partkey | colocationid | repmodel | partkeys
---------------------------------------------------------------------
t | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905500 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}"}
t2 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905501 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}","{VAR :varno 1 :varattno 2 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 2 :location -1}"}
(2 rows)
t3 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905501 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}","{VAR :varno 1 :varattno 2 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 2 :location -1}"}
(3 rows)
INSERT INTO t2 VALUES
(1, 1, 1),
@ -39,6 +47,11 @@ INSERT INTO t2 VALUES
(2, 3, 5),
(2, 4, 5)
;
INSERT INTO t3 VALUES
(1, 1, 1),
(2, 2, 2),
(2, 4, 4)
;
-- partitioning by both distribution columns pushes the window function down
SELECT id, id2, a, rnk FROM
(
@ -191,5 +204,114 @@ ORDER BY
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=20)
(20 rows)
-- Can pushdown if joining on both distribution columns
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id = t3.id AND t2.id2 = t3.id2
) foo
ORDER BY 1, 2, 3, 4;
id | id2 | a | b
---------------------------------------------------------------------
1 | 1 | 1 | 1
1 | 1 | 2 | 1
1 | 1 | 4 | 1
2 | 4 | 5 | 4
(4 rows)
EXPLAIN
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id = t3.id AND t2.id2 = t3.id2
) foo
ORDER BY 1, 2, 3, 4;
QUERY PLAN
---------------------------------------------------------------------
Sort (cost=8304.82..8554.82 rows=100000 width=16)
Sort Key: remote_scan.id, remote_scan.id2, remote_scan.a, remote_scan.b
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=16)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge Join (cost=285.08..316.72 rows=104 width=16)
Merge Cond: ((t2.id = t3.id) AND (t2.id2 = t3.id2))
-> Sort (cost=142.54..147.64 rows=2040 width=12)
Sort Key: t2.id, t2.id2
-> Seq Scan on t2_27905504 t2 (cost=0.00..30.40 rows=2040 width=12)
-> Sort (cost=142.54..147.64 rows=2040 width=12)
Sort Key: t3.id, t3.id2
-> Seq Scan on t3_27905508 t3 (cost=0.00..30.40 rows=2040 width=12)
(15 rows)
-- Cannot pushdown if not joining on both distribution columns
-- NOTE: This currently returns a result, because the logical planner will take
-- over and that doesn't know about. In the EXPLAIN below you can see that it's
-- not pushed down.
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id = t3.id
) foo
ORDER BY 1, 2, 3, 4;
id | id2 | a | b
---------------------------------------------------------------------
1 | 1 | 1 | 1
1 | 1 | 2 | 1
1 | 1 | 4 | 1
2 | 3 | 4 | 2
2 | 3 | 4 | 4
2 | 3 | 5 | 2
2 | 3 | 5 | 4
2 | 4 | 5 | 2
2 | 4 | 5 | 4
(9 rows)
EXPLAIN
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id = t3.id
) foo
ORDER BY 1, 2, 3, 4;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=16)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge Join (cost=285.08..607.40 rows=20808 width=16)
Merge Cond: (t2.id = t3.id)
-> Sort (cost=142.54..147.64 rows=2040 width=12)
Sort Key: t2.id
-> Seq Scan on t2_27905504 t2 (cost=0.00..30.40 rows=2040 width=12)
-> Sort (cost=142.54..147.64 rows=2040 width=8)
Sort Key: t3.id
-> Seq Scan on t3_27905508 t3 (cost=0.00..30.40 rows=2040 width=8)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Sort (cost=59.83..62.33 rows=1000 width=16)
Sort Key: intermediate_result.id, intermediate_result.id2, intermediate_result.a, intermediate_result.b
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=16)
(22 rows)
-- Cannot pushdown if not joining on both distribution columns
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id2 = t3.id2
) foo
ORDER BY 1, 2, 3, 4;
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
EXPLAIN
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id2 = t3.id2
) foo
ORDER BY 1, 2, 3, 4;
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
SET client_min_messages TO WARNING;
DROP SCHEMA multi_column_distribution CASCADE;

View File

@ -11,6 +11,9 @@ select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%');
create table t2(id int, id2 int, a int);
select create_distributed_table('t2', ARRAY['id', 'id2'], colocate_with := 'none');
create table t3(id int, id2 int, b int);
select create_distributed_table('t3', ARRAY['id', 'id2'], colocate_with := 't2');
select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%');
INSERT INTO t2 VALUES
@ -22,6 +25,12 @@ INSERT INTO t2 VALUES
(2, 4, 5)
;
INSERT INTO t3 VALUES
(1, 1, 1),
(2, 2, 2),
(2, 4, 4)
;
-- partitioning by both distribution columns pushes the window function down
SELECT id, id2, a, rnk FROM
(
@ -86,5 +95,51 @@ ORDER BY
rnk, id, id2, a;
-- Can pushdown if joining on both distribution columns
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id = t3.id AND t2.id2 = t3.id2
) foo
ORDER BY 1, 2, 3, 4;
EXPLAIN
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id = t3.id AND t2.id2 = t3.id2
) foo
ORDER BY 1, 2, 3, 4;
-- Cannot pushdown if not joining on both distribution columns
-- NOTE: This currently returns a result, because the logical planner will take
-- over and that doesn't know about. In the EXPLAIN below you can see that it's
-- not pushed down.
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id = t3.id
) foo
ORDER BY 1, 2, 3, 4;
EXPLAIN
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id = t3.id
) foo
ORDER BY 1, 2, 3, 4;
-- Cannot pushdown if not joining on both distribution columns
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id2 = t3.id2
) foo
ORDER BY 1, 2, 3, 4;
EXPLAIN
SELECT * FROM (
SELECT t2.id, t2.id2, a, b
FROM t2 JOIN t3 ON t2.id2 = t3.id2
) foo
ORDER BY 1, 2, 3, 4;
SET client_min_messages TO WARNING;
DROP SCHEMA multi_column_distribution CASCADE;