diff --git a/multi_column.txt b/multi_column.txt index a56a34db9..21bba382f 100644 --- a/multi_column.txt +++ b/multi_column.txt @@ -31,8 +31,12 @@ - [ ] PostprocessAlterTableStmtAttachPartition() - [x] TargetListOnPartitionColumn() - [ ] PartitionColumnForPushedDownSubquery() -- [ ] RestrictionEquivalenceForPartitionKeys() + +# query pushdown planner +- [x] RestrictionEquivalenceForPartitionKeys() +- [x] EquivalenceListContainsRelationsEquality() - [ ] SafeToPushdownUnionSubquery() +- [ ] RelationRestrictionPartitionKeyIndex() - [ ] PartitionMethod() diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index fb6a6a306..1d7e89361 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -126,18 +126,19 @@ static bool AttributeClassContainsAttributeClassMember(AttributeEquivalenceClass static List * AddAttributeClassToAttributeClassList(List *attributeEquivalenceList, AttributeEquivalenceClass * attributeEquivalence); -static bool AttributeEquivalencesAreEqual(AttributeEquivalenceClass * - firstAttributeEquivalence, - AttributeEquivalenceClass * - secondAttributeEquivalence); -static AttributeEquivalenceClass * GenerateCommonEquivalence(List * - attributeEquivalenceList, - RelationRestrictionContext * - relationRestrictionContext); -static AttributeEquivalenceClass * GenerateEquivalenceClassForRelationRestriction( - RelationRestrictionContext - * - relationRestrictionContext); +static bool AttributeEquivalencesAreEqual( + AttributeEquivalenceClass *firstAttributeEquivalence, + AttributeEquivalenceClass * + secondAttributeEquivalence); +static AttributeEquivalenceClass * GenerateCommonEquivalence( + List *attributeEquivalenceList, + RelationRestrictionContext * + relationRestrictionContext, + AttributeEquivalenceClass * + firstEquivalenceClass); +static List * GenerateEquivalenceClassesForRelationRestriction(RelationRestrictionContext + * + relationRestrictionContext); static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass * firstClass, AttributeEquivalenceClass * @@ -154,6 +155,7 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn rangeTableArrayLength, Relids queryRteIdentities); static Relids QueryRteIdentities(Query *queryTree); +static bool HasVarattnoInVarList(List *varList, AttrNumber varattno); #if PG_VERSION_NUM >= PG_VERSION_13 static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo); @@ -653,46 +655,53 @@ bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList, RelationRestrictionContext *restrictionContext) { - ListCell *commonEqClassCell = NULL; - ListCell *relationRestrictionCell = NULL; - Relids commonRteIdentities = NULL; - - /* - * In general we're trying to expand existing the equivalence classes to find a - * common equivalence class. The main goal is to test whether this main class - * contains all partition keys of the existing relations. - */ - AttributeEquivalenceClass *commonEquivalenceClass = GenerateCommonEquivalence( - attributeEquivalenceList, + List *firstEquivalenceClassList = GenerateEquivalenceClassesForRelationRestriction( restrictionContext); - /* add the rte indexes of relations to a bitmap */ - foreach(commonEqClassCell, commonEquivalenceClass->equivalentAttributes) + + AttributeEquivalenceClass *firstEquivalenceClass = NULL; + foreach_ptr(firstEquivalenceClass, firstEquivalenceClassList) { - AttributeEquivalenceClassMember *classMember = - (AttributeEquivalenceClassMember *) lfirst(commonEqClassCell); - int rteIdentity = classMember->rteIdentity; + /* + * In general we're trying to expand existing the equivalence classes to find a + * common equivalence class. The main goal is to test whether this main class + * contains all partition keys of the existing relations. + */ + AttributeEquivalenceClass *commonEquivalenceClass = GenerateCommonEquivalence( + attributeEquivalenceList, + restrictionContext, + firstEquivalenceClass); - commonRteIdentities = bms_add_member(commonRteIdentities, rteIdentity); - } + Relids commonRteIdentities = NULL; - /* check whether all relations exists in the main restriction list */ - foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) - { - RelationRestriction *relationRestriction = - (RelationRestriction *) lfirst(relationRestrictionCell); - int rteIdentity = GetRTEIdentity(relationRestriction->rte); + AttributeEquivalenceClassMember *classMember = NULL; - /* we shouldn't check for the equality of non-distributed tables */ - if (IsCitusTableType(relationRestriction->relationId, - CITUS_TABLE_WITH_NO_DIST_KEY)) + /* add the rte indexes of relations to a bitmap */ + foreach_ptr(classMember, commonEquivalenceClass->equivalentAttributes) { - continue; + int rteIdentity = classMember->rteIdentity; + + commonRteIdentities = bms_add_member(commonRteIdentities, rteIdentity); } - if (!bms_is_member(rteIdentity, commonRteIdentities)) + RelationRestriction *relationRestriction = NULL; + + /* check whether all relations exists in the main restriction list */ + foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList) { - return false; + int rteIdentity = GetRTEIdentity(relationRestriction->rte); + + /* we shouldn't check for the equality of non-distributed tables */ + if (IsCitusTableType(relationRestriction->relationId, + CITUS_TABLE_WITH_NO_DIST_KEY)) + { + continue; + } + + if (!bms_is_member(rteIdentity, commonRteIdentities)) + { + return false; + } } } @@ -928,7 +937,8 @@ SearchPlannerParamList(List *plannerParamList, Param *plannerParam) */ static AttributeEquivalenceClass * GenerateCommonEquivalence(List *attributeEquivalenceList, - RelationRestrictionContext *relationRestrictionContext) + RelationRestrictionContext *relationRestrictionContext, + AttributeEquivalenceClass *firstEquivalenceClass) { Bitmapset *addedEquivalenceIds = NULL; uint32 equivalenceListSize = list_length(attributeEquivalenceList); @@ -938,14 +948,6 @@ GenerateCommonEquivalence(List *attributeEquivalenceList, sizeof(AttributeEquivalenceClass)); commonEquivalenceClass->equivalenceId = 0; - /* - * We seed the common equivalence class with a the first distributed - * table since we always want the input distributed relations to be - * on the common class. - */ - AttributeEquivalenceClass *firstEquivalenceClass = - GenerateEquivalenceClassForRelationRestriction(relationRestrictionContext); - /* we skip the calculation if there are not enough information */ if (equivalenceListSize < 1 || firstEquivalenceClass == NULL) { @@ -989,9 +991,9 @@ GenerateCommonEquivalence(List *attributeEquivalenceList, ListConcatUniqueAttributeClassMemberLists(commonEquivalenceClass, currentEquivalenceClass); - addedEquivalenceIds = bms_add_member(addedEquivalenceIds, - currentEquivalenceClass-> - equivalenceId); + addedEquivalenceIds = bms_add_member( + addedEquivalenceIds, + currentEquivalenceClass->equivalenceId); /* * It seems inefficient to start from the beginning. @@ -1019,27 +1021,38 @@ GenerateCommonEquivalence(List *attributeEquivalenceList, /* - * GenerateEquivalenceClassForRelationRestriction generates an AttributeEquivalenceClass - * with a single AttributeEquivalenceClassMember. + * GenerateEquivalenceClassesForRelationRestriction generates a list of + * AttributeEquivalenceClasses that all have a single + * AttributeEquivalenceClassMember. The list contains one + * AttributeEquivalenceClass for of each of the distribution columns of the + * first distributed relation in the relationRestrictionContext. */ -static AttributeEquivalenceClass * -GenerateEquivalenceClassForRelationRestriction( +static List * +GenerateEquivalenceClassesForRelationRestriction( RelationRestrictionContext *relationRestrictionContext) { ListCell *relationRestrictionCell = NULL; - AttributeEquivalenceClassMember *eqMember = NULL; - AttributeEquivalenceClass *eqClassForRelation = NULL; foreach(relationRestrictionCell, relationRestrictionContext->relationRestrictionList) { RelationRestriction *relationRestriction = (RelationRestriction *) lfirst(relationRestrictionCell); - Var *relationPartitionKey = DistPartitionKey(relationRestriction->relationId); + List *relationPartitionKeyList = DistPartitionKeys( + relationRestriction->relationId); - if (relationPartitionKey) + if (list_length(relationPartitionKeyList) == 0) { - eqClassForRelation = palloc0(sizeof(AttributeEquivalenceClass)); - eqMember = palloc0(sizeof(AttributeEquivalenceClassMember)); + continue; + } + + List *equivalenceClasses = NIL; + Var *relationPartitionKey = NULL; + foreach_ptr(relationPartitionKey, relationPartitionKeyList) + { + AttributeEquivalenceClass *eqClassForRelation = palloc0( + sizeof(AttributeEquivalenceClass)); + AttributeEquivalenceClassMember *eqMember = palloc0( + sizeof(AttributeEquivalenceClassMember)); eqMember->relationId = relationRestriction->relationId; eqMember->rteIdentity = GetRTEIdentity(relationRestriction->rte); eqMember->varno = relationRestriction->index; @@ -1047,12 +1060,12 @@ GenerateEquivalenceClassForRelationRestriction( eqClassForRelation->equivalentAttributes = lappend(eqClassForRelation->equivalentAttributes, eqMember); - - break; + equivalenceClasses = lappend(equivalenceClasses, eqClassForRelation); } + return equivalenceClasses; } - return eqClassForRelation; + return NIL; } @@ -1532,18 +1545,12 @@ AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass * return; } - Var *relationPartitionKey = DistPartitionKey(relationId); + List *relationPartitionKeyList = DistPartitionKeys(relationId); Assert(rangeTableEntry->rtekind == RTE_RELATION); - /* we don't need reference tables in the equality on columns */ - if (relationPartitionKey == NULL) - { - return; - } - /* we're only interested in distribution columns */ - if (relationPartitionKey->varattno != varToBeAdded->varattno) + if (!HasVarattnoInVarList(relationPartitionKeyList, varToBeAdded->varattno)) { return; } @@ -1562,6 +1569,21 @@ AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass * } +static bool +HasVarattnoInVarList(List *varList, AttrNumber varattno) +{ + Var *var = NULL; + foreach_ptr(var, varList) + { + if (var->varattno == varattno) + { + return true; + } + } + return false; +} + + /* * AttributeClassContainsAttributeClassMember returns true if it the input class member * is already exists in the attributeEquivalenceClass. An equality is identified by the diff --git a/src/test/regress/expected/multi_column_distribution.out b/src/test/regress/expected/multi_column_distribution.out index c3998c7e2..7a182b9d4 100644 --- a/src/test/regress/expected/multi_column_distribution.out +++ b/src/test/regress/expected/multi_column_distribution.out @@ -24,12 +24,20 @@ select create_distributed_table('t2', ARRAY['id', 'id2'], colocate_with := 'none (1 row) +create table t3(id int, id2 int, b int); +select create_distributed_table('t3', ARRAY['id', 'id2'], colocate_with := 't2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%'); logicalrelid | partmethod | partkey | colocationid | repmodel | partkeys --------------------------------------------------------------------- t | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905500 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}"} t2 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905501 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}","{VAR :varno 1 :varattno 2 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 2 :location -1}"} -(2 rows) + t3 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905501 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}","{VAR :varno 1 :varattno 2 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 2 :location -1}"} +(3 rows) INSERT INTO t2 VALUES (1, 1, 1), @@ -39,6 +47,11 @@ INSERT INTO t2 VALUES (2, 3, 5), (2, 4, 5) ; +INSERT INTO t3 VALUES +(1, 1, 1), +(2, 2, 2), +(2, 4, 4) +; -- partitioning by both distribution columns pushes the window function down SELECT id, id2, a, rnk FROM ( @@ -191,5 +204,114 @@ ORDER BY -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=20) (20 rows) +-- Can pushdown if joining on both distribution columns +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id = t3.id AND t2.id2 = t3.id2 +) foo +ORDER BY 1, 2, 3, 4; + id | id2 | a | b +--------------------------------------------------------------------- + 1 | 1 | 1 | 1 + 1 | 1 | 2 | 1 + 1 | 1 | 4 | 1 + 2 | 4 | 5 | 4 +(4 rows) + +EXPLAIN +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id = t3.id AND t2.id2 = t3.id2 +) foo +ORDER BY 1, 2, 3, 4; + QUERY PLAN +--------------------------------------------------------------------- + Sort (cost=8304.82..8554.82 rows=100000 width=16) + Sort Key: remote_scan.id, remote_scan.id2, remote_scan.a, remote_scan.b + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=16) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge Join (cost=285.08..316.72 rows=104 width=16) + Merge Cond: ((t2.id = t3.id) AND (t2.id2 = t3.id2)) + -> Sort (cost=142.54..147.64 rows=2040 width=12) + Sort Key: t2.id, t2.id2 + -> Seq Scan on t2_27905504 t2 (cost=0.00..30.40 rows=2040 width=12) + -> Sort (cost=142.54..147.64 rows=2040 width=12) + Sort Key: t3.id, t3.id2 + -> Seq Scan on t3_27905508 t3 (cost=0.00..30.40 rows=2040 width=12) +(15 rows) + +-- Cannot pushdown if not joining on both distribution columns +-- NOTE: This currently returns a result, because the logical planner will take +-- over and that doesn't know about. In the EXPLAIN below you can see that it's +-- not pushed down. +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id = t3.id +) foo +ORDER BY 1, 2, 3, 4; + id | id2 | a | b +--------------------------------------------------------------------- + 1 | 1 | 1 | 1 + 1 | 1 | 2 | 1 + 1 | 1 | 4 | 1 + 2 | 3 | 4 | 2 + 2 | 3 | 4 | 4 + 2 | 3 | 5 | 2 + 2 | 3 | 5 | 4 + 2 | 4 | 5 | 2 + 2 | 4 | 5 | 4 +(9 rows) + +EXPLAIN +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id = t3.id +) foo +ORDER BY 1, 2, 3, 4; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=16) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge Join (cost=285.08..607.40 rows=20808 width=16) + Merge Cond: (t2.id = t3.id) + -> Sort (cost=142.54..147.64 rows=2040 width=12) + Sort Key: t2.id + -> Seq Scan on t2_27905504 t2 (cost=0.00..30.40 rows=2040 width=12) + -> Sort (cost=142.54..147.64 rows=2040 width=8) + Sort Key: t3.id + -> Seq Scan on t3_27905508 t3 (cost=0.00..30.40 rows=2040 width=8) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Sort (cost=59.83..62.33 rows=1000 width=16) + Sort Key: intermediate_result.id, intermediate_result.id2, intermediate_result.a, intermediate_result.b + -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=16) +(22 rows) + +-- Cannot pushdown if not joining on both distribution columns +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id2 = t3.id2 +) foo +ORDER BY 1, 2, 3, 4; +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +EXPLAIN +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id2 = t3.id2 +) foo +ORDER BY 1, 2, 3, 4; +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning SET client_min_messages TO WARNING; DROP SCHEMA multi_column_distribution CASCADE; diff --git a/src/test/regress/sql/multi_column_distribution.sql b/src/test/regress/sql/multi_column_distribution.sql index fdd88cde1..a18aa1634 100644 --- a/src/test/regress/sql/multi_column_distribution.sql +++ b/src/test/regress/sql/multi_column_distribution.sql @@ -11,6 +11,9 @@ select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%'); create table t2(id int, id2 int, a int); select create_distributed_table('t2', ARRAY['id', 'id2'], colocate_with := 'none'); + +create table t3(id int, id2 int, b int); +select create_distributed_table('t3', ARRAY['id', 'id2'], colocate_with := 't2'); select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%'); INSERT INTO t2 VALUES @@ -22,6 +25,12 @@ INSERT INTO t2 VALUES (2, 4, 5) ; +INSERT INTO t3 VALUES +(1, 1, 1), +(2, 2, 2), +(2, 4, 4) +; + -- partitioning by both distribution columns pushes the window function down SELECT id, id2, a, rnk FROM ( @@ -86,5 +95,51 @@ ORDER BY rnk, id, id2, a; +-- Can pushdown if joining on both distribution columns +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id = t3.id AND t2.id2 = t3.id2 +) foo +ORDER BY 1, 2, 3, 4; + +EXPLAIN +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id = t3.id AND t2.id2 = t3.id2 +) foo +ORDER BY 1, 2, 3, 4; + +-- Cannot pushdown if not joining on both distribution columns +-- NOTE: This currently returns a result, because the logical planner will take +-- over and that doesn't know about. In the EXPLAIN below you can see that it's +-- not pushed down. +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id = t3.id +) foo +ORDER BY 1, 2, 3, 4; + +EXPLAIN +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id = t3.id +) foo +ORDER BY 1, 2, 3, 4; + +-- Cannot pushdown if not joining on both distribution columns +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id2 = t3.id2 +) foo +ORDER BY 1, 2, 3, 4; + +EXPLAIN +SELECT * FROM ( + SELECT t2.id, t2.id2, a, b + FROM t2 JOIN t3 ON t2.id2 = t3.id2 +) foo +ORDER BY 1, 2, 3, 4; + + SET client_min_messages TO WARNING; DROP SCHEMA multi_column_distribution CASCADE;