diff --git a/multi_column.txt b/multi_column.txt index 5adad9378..a56a34db9 100644 --- a/multi_column.txt +++ b/multi_column.txt @@ -29,6 +29,10 @@ - [x] BuildCitusTableCacheEntry() - [ ] PostprocessCreateTableStmtPartitionOf() - [ ] PostprocessAlterTableStmtAttachPartition() +- [x] TargetListOnPartitionColumn() +- [ ] PartitionColumnForPushedDownSubquery() +- [ ] RestrictionEquivalenceForPartitionKeys() +- [ ] SafeToPushdownUnionSubquery() - [ ] PartitionMethod() diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 137e19fc7..e41ddac36 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -1054,8 +1054,8 @@ MultiTaskRouterSelectQuerySupported(Query *query) List *targetEntryList = subquery->targetList; List *groupTargetEntryList = GroupTargetEntryList(groupClauseList, targetEntryList); - bool groupOnPartitionColumn = TargetListOnPartitionColumn(subquery, - groupTargetEntryList); + bool groupOnPartitionColumn = TargetListOnPartitionColumns(subquery, + groupTargetEntryList); if (!groupOnPartitionColumn) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 615c0ddbe..60460514e 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -4375,6 +4375,25 @@ GroupTargetEntryList(List *groupClauseList, List *targetEntryList) */ bool IsPartitionColumn(Expr *columnExpression, Query *query) +{ + return IsFirstPartitionColumn(columnExpression, query); +} + + +/* + * IsFirstPartitionColumn returns true if the given column is the first + * partition column of a table. The function uses FindReferencedTableColumn to + * find the original relation id and column that the column expression refers + * to. It then checks whether that column is a partition column of the + * relation. + * + * Also, the function returns always false for reference tables given that + * reference tables do not have partition column. The function does not + * support queries with CTEs, it would return false if columnExpression + * refers to a column returned by a CTE. + */ +bool +IsFirstPartitionColumn(Expr *columnExpression, Query *query) { bool isPartitionColumn = false; Oid relationId = InvalidOid; diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index c7e441e06..6a02baf9f 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -199,12 +199,75 @@ FindNodeMatchingCheckFunction(Node *node, CheckNodeFunc checker) } +static bool +TargetListOnPartitionColumnsOfRelation(Query *query, List *targetEntryList, Oid + relationId) +{ + List *partitionColumnList = DistPartitionKeys(relationId); + + /* + * TODO: Rename function, make this configurable, or remove this + */ + bool skippedFirst = false; + Var *partitionColumn = NULL; + foreach_ptr(partitionColumn, partitionColumnList) + { + if (!skippedFirst) + { + skippedFirst = true; + continue; + } + + TargetEntry *targetEntry = NULL; + bool foundColumn = false; + foreach_ptr(targetEntry, targetEntryList) + { + Var *column = NULL; + Expr *targetExpression = targetEntry->expr; + Oid expressionRelationId = InvalidOid; + FindReferencedTableColumn(targetExpression, NIL, query, &expressionRelationId, + &column); + + if (expressionRelationId != relationId || column == NULL) + { + continue; + } + + if (column->varattno != partitionColumn->varattno) + { + continue; + } + FieldSelect *compositeField = CompositeFieldRecursive( + targetExpression, + query); + if (compositeField) + { + /* + * TODO: Ensure that multi column distribution tables cannot be + * created in create_distributed_table with composite fields as + * secondary distribution columns. + */ + ereport(ERROR, (errmsg( + "composite fields are not supported as secondary distribution columns"))); + } + foundColumn = true; + break; + } + if (!foundColumn) + { + return false; + } + } + return true; +} + + /* - * TargetListOnPartitionColumn checks if at least one target list entry is on - * partition column. + * TargetListOnPartitionColumn checks if each of the partition columns has at + * least one matching entry in the target list. */ bool -TargetListOnPartitionColumn(Query *query, List *targetEntryList) +TargetListOnPartitionColumns(Query *query, List *targetEntryList) { List *compositeFieldList = NIL; @@ -212,8 +275,6 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) foreach_ptr(targetEntry, targetEntryList) { Expr *targetExpression = targetEntry->expr; - - bool isPartitionColumn = IsPartitionColumn(targetExpression, query); Oid relationId = InvalidOid; Var *column = NULL; @@ -228,18 +289,35 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) continue; } - if (isPartitionColumn) + /* + * TODO: Maybe pass relationId to IsFirstPartitionColumn, so it doesn't + * have to look it up again. + */ + if (!IsFirstPartitionColumn(targetExpression, query)) { - FieldSelect *compositeField = CompositeFieldRecursive(targetExpression, - query); - if (compositeField) - { - compositeFieldList = lappend(compositeFieldList, compositeField); - } - else - { - return true; - } + continue; + } + + /* + * This is the first partition column of the table. Now we check if all + * other partition colums of this table are referenced in the target + * list. + */ + + if (!TargetListOnPartitionColumnsOfRelation(query, targetEntryList, relationId)) + { + continue; + } + + FieldSelect *compositeField = CompositeFieldRecursive(targetExpression, + query); + if (compositeField) + { + compositeFieldList = lappend(compositeFieldList, compositeField); + } + else + { + return true; } } diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 632b9dd72..babe5ec99 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -466,7 +466,7 @@ WindowPartitionOnDistributionColumn(Query *query) GroupTargetEntryList(partitionClauseList, targetEntryList); bool partitionOnDistributionColumn = - TargetListOnPartitionColumn(query, groupTargetEntryList); + TargetListOnPartitionColumns(query, groupTargetEntryList); if (!partitionOnDistributionColumn) { @@ -1056,7 +1056,7 @@ DeferErrorIfSubqueryRequiresMerge(Query *subqueryTree) List *groupTargetEntryList = GroupTargetEntryList(groupClauseList, targetEntryList); bool groupOnPartitionColumn = - TargetListOnPartitionColumn(subqueryTree, groupTargetEntryList); + TargetListOnPartitionColumns(subqueryTree, groupTargetEntryList); if (!groupOnPartitionColumn) { preconditionsSatisfied = false; @@ -1103,7 +1103,7 @@ DeferErrorIfSubqueryRequiresMerge(Query *subqueryTree) List *distinctTargetEntryList = GroupTargetEntryList(distinctClauseList, targetEntryList); bool distinctOnPartitionColumn = - TargetListOnPartitionColumn(subqueryTree, distinctTargetEntryList); + TargetListOnPartitionColumns(subqueryTree, distinctTargetEntryList); if (!distinctOnPartitionColumn) { preconditionsSatisfied = false; diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 9e6167959..81f43ae37 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -172,6 +172,7 @@ extern List * SubqueryMultiTableList(MultiNode *multiNode); extern List * GroupTargetEntryList(List *groupClauseList, List *targetEntryList); extern bool ExtractQueryWalker(Node *node, List **queryList); extern bool IsPartitionColumn(Expr *columnExpression, Query *query); +extern bool IsFirstPartitionColumn(Expr *columnExpression, Query *query); extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *query, Oid *relationId, Var **column); extern char * WorkerColumnName(AttrNumber resno); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index a7a8ce0ff..7bf32461c 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -192,7 +192,7 @@ extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *query PlannerRestrictionContext * plannerRestrictionContext); extern bool FindNodeMatchingCheckFunction(Node *node, bool (*check)(Node *)); -extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); +extern bool TargetListOnPartitionColumns(Query *query, List *targetEntryList); extern bool FindNodeMatchingCheckFunctionInRangeTableList(List *rtable, bool (*check)( Node *)); extern bool IsCitusTableRTE(Node *node); diff --git a/src/test/regress/expected/multi_column_distribution.out b/src/test/regress/expected/multi_column_distribution.out index 28aed8859..c3998c7e2 100644 --- a/src/test/regress/expected/multi_column_distribution.out +++ b/src/test/regress/expected/multi_column_distribution.out @@ -17,8 +17,8 @@ select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%'); t | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905500 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}"} (1 row) -create table t2(id int, a int); -select create_distributed_table('t2', ARRAY['id', 'a'], colocate_with := 'none'); +create table t2(id int, id2 int, a int); +select create_distributed_table('t2', ARRAY['id', 'id2'], colocate_with := 'none'); create_distributed_table --------------------------------------------------------------------- @@ -31,5 +31,165 @@ select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%'); t2 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905501 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}","{VAR :varno 1 :varattno 2 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 2 :location -1}"} (2 rows) +INSERT INTO t2 VALUES +(1, 1, 1), +(1, 1, 2), +(1, 1, 4), +(2, 3, 4), +(2, 3, 5), +(2, 4, 5) +; +-- partitioning by both distribution columns pushes the window function down +SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id, id2 ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + id | id2 | a | rnk +--------------------------------------------------------------------- + 1 | 1 | 1 | 1 + 2 | 3 | 4 | 1 + 2 | 4 | 5 | 1 + 1 | 1 | 2 | 2 + 2 | 3 | 5 | 2 + 1 | 1 | 4 | 3 +(6 rows) + +EXPLAIN SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id, id2 ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + QUERY PLAN +--------------------------------------------------------------------- + Sort (cost=10355.82..10605.82 rows=100000 width=20) + Sort Key: remote_scan.rnk, remote_scan.id, remote_scan.id2, remote_scan.a + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=20) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate (cost=208.84..210.88 rows=204 width=20) + Group Key: t2.id, t2.id2, t2.a, rank() OVER (?) + -> WindowAgg (cost=142.54..188.44 rows=2040 width=20) + -> Sort (cost=142.54..147.64 rows=2040 width=12) + Sort Key: t2.id, t2.id2, t2.a + -> Seq Scan on t2_27905504 t2 (cost=0.00..30.40 rows=2040 width=12) +(13 rows) + +-- partitioning by one of the distribution causes the window function not to be +-- pushed down. +SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + id | id2 | a | rnk +--------------------------------------------------------------------- + 1 | 1 | 1 | 1 + 2 | 3 | 4 | 1 + 1 | 1 | 2 | 2 + 2 | 3 | 5 | 2 + 2 | 4 | 5 | 2 + 1 | 1 | 4 | 3 +(6 rows) + +EXPLAIN SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + -> Distributed Subplan XXX_1 + -> HashAggregate (cost=11304.82..11306.82 rows=200 width=20) + Group Key: remote_scan.id, remote_scan.id2, remote_scan.a, rank() OVER (?) + -> WindowAgg (cost=8304.82..10304.82 rows=100000 width=20) + -> Sort (cost=8304.82..8554.82 rows=100000 width=12) + Sort Key: remote_scan.id, remote_scan.a + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=12) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on t2_27905504 t2 (cost=0.00..30.40 rows=2040 width=12) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Sort (cost=59.83..62.33 rows=1000 width=20) + Sort Key: intermediate_result.rnk, intermediate_result.id, intermediate_result.id2, intermediate_result.a + -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=20) +(20 rows) + +SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id2 ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + id | id2 | a | rnk +--------------------------------------------------------------------- + 1 | 1 | 1 | 1 + 2 | 3 | 4 | 1 + 2 | 4 | 5 | 1 + 1 | 1 | 2 | 2 + 2 | 3 | 5 | 2 + 1 | 1 | 4 | 3 +(6 rows) + +EXPLAIN SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id2 ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + -> Distributed Subplan XXX_1 + -> HashAggregate (cost=11304.82..11306.82 rows=200 width=20) + Group Key: remote_scan.id, remote_scan.id2, remote_scan.a, rank() OVER (?) + -> WindowAgg (cost=8304.82..10304.82 rows=100000 width=20) + -> Sort (cost=8304.82..8554.82 rows=100000 width=12) + Sort Key: remote_scan.id2, remote_scan.a + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=12) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on t2_27905504 t2 (cost=0.00..30.40 rows=2040 width=12) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Sort (cost=59.83..62.33 rows=1000 width=20) + Sort Key: intermediate_result.rnk, intermediate_result.id, intermediate_result.id2, intermediate_result.a + -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=20) +(20 rows) + SET client_min_messages TO WARNING; DROP SCHEMA multi_column_distribution CASCADE; diff --git a/src/test/regress/sql/multi_column_distribution.sql b/src/test/regress/sql/multi_column_distribution.sql index 810b01fc3..fdd88cde1 100644 --- a/src/test/regress/sql/multi_column_distribution.sql +++ b/src/test/regress/sql/multi_column_distribution.sql @@ -9,9 +9,82 @@ create table t(id int, a int); select create_distributed_table('t', ARRAY['id'], colocate_with := 'none'); select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%'); -create table t2(id int, a int); -select create_distributed_table('t2', ARRAY['id', 'a'], colocate_with := 'none'); +create table t2(id int, id2 int, a int); +select create_distributed_table('t2', ARRAY['id', 'id2'], colocate_with := 'none'); select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%'); +INSERT INTO t2 VALUES +(1, 1, 1), +(1, 1, 2), +(1, 1, 4), +(2, 3, 4), +(2, 3, 5), +(2, 4, 5) +; + +-- partitioning by both distribution columns pushes the window function down +SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id, id2 ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + +EXPLAIN SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id, id2 ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + +-- partitioning by one of the distribution causes the window function not to be +-- pushed down. +SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + +EXPLAIN SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + +SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id2 ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + +EXPLAIN SELECT id, id2, a, rnk FROM +( + SELECT + DISTINCT id, id2, a, rank() OVER (PARTITION BY id2 ORDER BY a) as rnk + FROM + t2 +) as foo +ORDER BY + rnk, id, id2, a; + + SET client_min_messages TO WARNING; DROP SCHEMA multi_column_distribution CASCADE;