TargetListOnPartitionColumns

multi-column-distribution
Jelte Fennema 2021-06-09 13:57:07 +02:00
parent bb26c3b9f7
commit 5f748eeea0
9 changed files with 361 additions and 26 deletions

View File

@ -29,6 +29,10 @@
- [x] BuildCitusTableCacheEntry()
- [ ] PostprocessCreateTableStmtPartitionOf()
- [ ] PostprocessAlterTableStmtAttachPartition()
- [x] TargetListOnPartitionColumn()
- [ ] PartitionColumnForPushedDownSubquery()
- [ ] RestrictionEquivalenceForPartitionKeys()
- [ ] SafeToPushdownUnionSubquery()
- [ ] PartitionMethod()

View File

@ -1054,8 +1054,8 @@ MultiTaskRouterSelectQuerySupported(Query *query)
List *targetEntryList = subquery->targetList;
List *groupTargetEntryList = GroupTargetEntryList(groupClauseList,
targetEntryList);
bool groupOnPartitionColumn = TargetListOnPartitionColumn(subquery,
groupTargetEntryList);
bool groupOnPartitionColumn = TargetListOnPartitionColumns(subquery,
groupTargetEntryList);
if (!groupOnPartitionColumn)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,

View File

@ -4375,6 +4375,25 @@ GroupTargetEntryList(List *groupClauseList, List *targetEntryList)
*/
bool
IsPartitionColumn(Expr *columnExpression, Query *query)
{
return IsFirstPartitionColumn(columnExpression, query);
}
/*
* IsFirstPartitionColumn returns true if the given column is the first
* partition column of a table. The function uses FindReferencedTableColumn to
* find the original relation id and column that the column expression refers
* to. It then checks whether that column is a partition column of the
* relation.
*
* Also, the function returns always false for reference tables given that
* reference tables do not have partition column. The function does not
* support queries with CTEs, it would return false if columnExpression
* refers to a column returned by a CTE.
*/
bool
IsFirstPartitionColumn(Expr *columnExpression, Query *query)
{
bool isPartitionColumn = false;
Oid relationId = InvalidOid;

View File

@ -199,12 +199,75 @@ FindNodeMatchingCheckFunction(Node *node, CheckNodeFunc checker)
}
static bool
TargetListOnPartitionColumnsOfRelation(Query *query, List *targetEntryList, Oid
relationId)
{
List *partitionColumnList = DistPartitionKeys(relationId);
/*
* TODO: Rename function, make this configurable, or remove this
*/
bool skippedFirst = false;
Var *partitionColumn = NULL;
foreach_ptr(partitionColumn, partitionColumnList)
{
if (!skippedFirst)
{
skippedFirst = true;
continue;
}
TargetEntry *targetEntry = NULL;
bool foundColumn = false;
foreach_ptr(targetEntry, targetEntryList)
{
Var *column = NULL;
Expr *targetExpression = targetEntry->expr;
Oid expressionRelationId = InvalidOid;
FindReferencedTableColumn(targetExpression, NIL, query, &expressionRelationId,
&column);
if (expressionRelationId != relationId || column == NULL)
{
continue;
}
if (column->varattno != partitionColumn->varattno)
{
continue;
}
FieldSelect *compositeField = CompositeFieldRecursive(
targetExpression,
query);
if (compositeField)
{
/*
* TODO: Ensure that multi column distribution tables cannot be
* created in create_distributed_table with composite fields as
* secondary distribution columns.
*/
ereport(ERROR, (errmsg(
"composite fields are not supported as secondary distribution columns")));
}
foundColumn = true;
break;
}
if (!foundColumn)
{
return false;
}
}
return true;
}
/*
* TargetListOnPartitionColumn checks if at least one target list entry is on
* partition column.
* TargetListOnPartitionColumn checks if each of the partition columns has at
* least one matching entry in the target list.
*/
bool
TargetListOnPartitionColumn(Query *query, List *targetEntryList)
TargetListOnPartitionColumns(Query *query, List *targetEntryList)
{
List *compositeFieldList = NIL;
@ -212,8 +275,6 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
foreach_ptr(targetEntry, targetEntryList)
{
Expr *targetExpression = targetEntry->expr;
bool isPartitionColumn = IsPartitionColumn(targetExpression, query);
Oid relationId = InvalidOid;
Var *column = NULL;
@ -228,18 +289,35 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
continue;
}
if (isPartitionColumn)
/*
* TODO: Maybe pass relationId to IsFirstPartitionColumn, so it doesn't
* have to look it up again.
*/
if (!IsFirstPartitionColumn(targetExpression, query))
{
FieldSelect *compositeField = CompositeFieldRecursive(targetExpression,
query);
if (compositeField)
{
compositeFieldList = lappend(compositeFieldList, compositeField);
}
else
{
return true;
}
continue;
}
/*
* This is the first partition column of the table. Now we check if all
* other partition colums of this table are referenced in the target
* list.
*/
if (!TargetListOnPartitionColumnsOfRelation(query, targetEntryList, relationId))
{
continue;
}
FieldSelect *compositeField = CompositeFieldRecursive(targetExpression,
query);
if (compositeField)
{
compositeFieldList = lappend(compositeFieldList, compositeField);
}
else
{
return true;
}
}

View File

@ -466,7 +466,7 @@ WindowPartitionOnDistributionColumn(Query *query)
GroupTargetEntryList(partitionClauseList, targetEntryList);
bool partitionOnDistributionColumn =
TargetListOnPartitionColumn(query, groupTargetEntryList);
TargetListOnPartitionColumns(query, groupTargetEntryList);
if (!partitionOnDistributionColumn)
{
@ -1056,7 +1056,7 @@ DeferErrorIfSubqueryRequiresMerge(Query *subqueryTree)
List *groupTargetEntryList = GroupTargetEntryList(groupClauseList,
targetEntryList);
bool groupOnPartitionColumn =
TargetListOnPartitionColumn(subqueryTree, groupTargetEntryList);
TargetListOnPartitionColumns(subqueryTree, groupTargetEntryList);
if (!groupOnPartitionColumn)
{
preconditionsSatisfied = false;
@ -1103,7 +1103,7 @@ DeferErrorIfSubqueryRequiresMerge(Query *subqueryTree)
List *distinctTargetEntryList = GroupTargetEntryList(distinctClauseList,
targetEntryList);
bool distinctOnPartitionColumn =
TargetListOnPartitionColumn(subqueryTree, distinctTargetEntryList);
TargetListOnPartitionColumns(subqueryTree, distinctTargetEntryList);
if (!distinctOnPartitionColumn)
{
preconditionsSatisfied = false;

View File

@ -172,6 +172,7 @@ extern List * SubqueryMultiTableList(MultiNode *multiNode);
extern List * GroupTargetEntryList(List *groupClauseList, List *targetEntryList);
extern bool ExtractQueryWalker(Node *node, List **queryList);
extern bool IsPartitionColumn(Expr *columnExpression, Query *query);
extern bool IsFirstPartitionColumn(Expr *columnExpression, Query *query);
extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList,
Query *query, Oid *relationId, Var **column);
extern char * WorkerColumnName(AttrNumber resno);

View File

@ -192,7 +192,7 @@ extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *query
PlannerRestrictionContext *
plannerRestrictionContext);
extern bool FindNodeMatchingCheckFunction(Node *node, bool (*check)(Node *));
extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList);
extern bool TargetListOnPartitionColumns(Query *query, List *targetEntryList);
extern bool FindNodeMatchingCheckFunctionInRangeTableList(List *rtable, bool (*check)(
Node *));
extern bool IsCitusTableRTE(Node *node);

View File

@ -17,8 +17,8 @@ select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%');
t | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905500 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}"}
(1 row)
create table t2(id int, a int);
select create_distributed_table('t2', ARRAY['id', 'a'], colocate_with := 'none');
create table t2(id int, id2 int, a int);
select create_distributed_table('t2', ARRAY['id', 'id2'], colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
@ -31,5 +31,165 @@ select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%');
t2 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905501 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}","{VAR :varno 1 :varattno 2 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 2 :location -1}"}
(2 rows)
INSERT INTO t2 VALUES
(1, 1, 1),
(1, 1, 2),
(1, 1, 4),
(2, 3, 4),
(2, 3, 5),
(2, 4, 5)
;
-- partitioning by both distribution columns pushes the window function down
SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id, id2 ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
id | id2 | a | rnk
---------------------------------------------------------------------
1 | 1 | 1 | 1
2 | 3 | 4 | 1
2 | 4 | 5 | 1
1 | 1 | 2 | 2
2 | 3 | 5 | 2
1 | 1 | 4 | 3
(6 rows)
EXPLAIN SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id, id2 ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
QUERY PLAN
---------------------------------------------------------------------
Sort (cost=10355.82..10605.82 rows=100000 width=20)
Sort Key: remote_scan.rnk, remote_scan.id, remote_scan.id2, remote_scan.a
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=20)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate (cost=208.84..210.88 rows=204 width=20)
Group Key: t2.id, t2.id2, t2.a, rank() OVER (?)
-> WindowAgg (cost=142.54..188.44 rows=2040 width=20)
-> Sort (cost=142.54..147.64 rows=2040 width=12)
Sort Key: t2.id, t2.id2, t2.a
-> Seq Scan on t2_27905504 t2 (cost=0.00..30.40 rows=2040 width=12)
(13 rows)
-- partitioning by one of the distribution causes the window function not to be
-- pushed down.
SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
id | id2 | a | rnk
---------------------------------------------------------------------
1 | 1 | 1 | 1
2 | 3 | 4 | 1
1 | 1 | 2 | 2
2 | 3 | 5 | 2
2 | 4 | 5 | 2
1 | 1 | 4 | 3
(6 rows)
EXPLAIN SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
-> Distributed Subplan XXX_1
-> HashAggregate (cost=11304.82..11306.82 rows=200 width=20)
Group Key: remote_scan.id, remote_scan.id2, remote_scan.a, rank() OVER (?)
-> WindowAgg (cost=8304.82..10304.82 rows=100000 width=20)
-> Sort (cost=8304.82..8554.82 rows=100000 width=12)
Sort Key: remote_scan.id, remote_scan.a
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=12)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on t2_27905504 t2 (cost=0.00..30.40 rows=2040 width=12)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Sort (cost=59.83..62.33 rows=1000 width=20)
Sort Key: intermediate_result.rnk, intermediate_result.id, intermediate_result.id2, intermediate_result.a
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=20)
(20 rows)
SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id2 ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
id | id2 | a | rnk
---------------------------------------------------------------------
1 | 1 | 1 | 1
2 | 3 | 4 | 1
2 | 4 | 5 | 1
1 | 1 | 2 | 2
2 | 3 | 5 | 2
1 | 1 | 4 | 3
(6 rows)
EXPLAIN SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id2 ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
-> Distributed Subplan XXX_1
-> HashAggregate (cost=11304.82..11306.82 rows=200 width=20)
Group Key: remote_scan.id, remote_scan.id2, remote_scan.a, rank() OVER (?)
-> WindowAgg (cost=8304.82..10304.82 rows=100000 width=20)
-> Sort (cost=8304.82..8554.82 rows=100000 width=12)
Sort Key: remote_scan.id2, remote_scan.a
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=12)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on t2_27905504 t2 (cost=0.00..30.40 rows=2040 width=12)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Sort (cost=59.83..62.33 rows=1000 width=20)
Sort Key: intermediate_result.rnk, intermediate_result.id, intermediate_result.id2, intermediate_result.a
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=20)
(20 rows)
SET client_min_messages TO WARNING;
DROP SCHEMA multi_column_distribution CASCADE;

View File

@ -9,9 +9,82 @@ create table t(id int, a int);
select create_distributed_table('t', ARRAY['id'], colocate_with := 'none');
select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%');
create table t2(id int, a int);
select create_distributed_table('t2', ARRAY['id', 'a'], colocate_with := 'none');
create table t2(id int, id2 int, a int);
select create_distributed_table('t2', ARRAY['id', 'id2'], colocate_with := 'none');
select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%');
INSERT INTO t2 VALUES
(1, 1, 1),
(1, 1, 2),
(1, 1, 4),
(2, 3, 4),
(2, 3, 5),
(2, 4, 5)
;
-- partitioning by both distribution columns pushes the window function down
SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id, id2 ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
EXPLAIN SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id, id2 ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
-- partitioning by one of the distribution causes the window function not to be
-- pushed down.
SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
EXPLAIN SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id2 ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
EXPLAIN SELECT id, id2, a, rnk FROM
(
SELECT
DISTINCT id, id2, a, rank() OVER (PARTITION BY id2 ORDER BY a) as rnk
FROM
t2
) as foo
ORDER BY
rnk, id, id2, a;
SET client_min_messages TO WARNING;
DROP SCHEMA multi_column_distribution CASCADE;