Add view support

Enables use views within distributed queries.
User can create and use a view on distributed tables/queries
as he/she would use with regular queries.

After this change router queries will have full support for views,
insert into select queries will support reading from views, not
writing into. Outer joins would have a limited support, and would
error out at certain cases such as when a view is in the inner side
of the outer join.

Although PostgreSQL supports writing into views under certain circumstances.
We disallowed that for distributed views.
pull/1024/head
Murat Tuncer 2016-12-01 15:56:35 +03:00
parent feef1bc70b
commit 77f8db6b14
16 changed files with 657 additions and 64 deletions

View File

@ -3280,7 +3280,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Expr *targetExpression = targetEntry->expr;
bool isPartitionColumn = IsPartitionColumnRecursive(targetExpression, query);
bool isPartitionColumn = IsPartitionColumn(targetExpression, query);
if (isPartitionColumn)
{
FieldSelect *compositeField = CompositeFieldRecursive(targetExpression,
@ -3312,24 +3312,52 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
/*
* IsPartitionColumnRecursive recursively checks if the given column is a partition
* column. If a column is referenced from a regular table, we directly check if
* it is a partition column. If a column is referenced from a subquery, then we
* recursively check that subquery until we reach the source of that column, and
* verify this column is a partition column. If a column is referenced from a
* join range table entry, then we resolve which join column it refers and
* recursively check this column with the same query.
* IsPartitionColumn returns true if the given column is a partition column.
* The function uses FindReferencedTableColumn to find the original relation
* id and column that the column expression refers to. It then checks whether
* that column is a partition column of the relation.
*
* Note that if the given expression is a field of a composite type, then this
* function checks if this composite column is a partition column.
*
* Also, the function returns always false for reference tables given that reference
* tables do not have partition column.
* Also, the function returns always false for reference tables given that
* reference tables do not have partition column. The function does not
* support queries with CTEs, it would return false if columnExpression
* refers to a column returned by a CTE.
*/
bool
IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
IsPartitionColumn(Expr *columnExpression, Query *query)
{
bool isPartitionColumn = false;
Oid relationId = InvalidOid;
Var *column = NULL;
FindReferencedTableColumn(columnExpression, NIL, query, &relationId, &column);
if (relationId != InvalidOid && column != NULL)
{
Var *partitionColumn = PartitionKey(relationId);
/* not all distributed tables have partition column */
if (partitionColumn != NULL && column->varattno == partitionColumn->varattno)
{
isPartitionColumn = true;
}
}
return isPartitionColumn;
}
/*
* FindReferencedTableColumn recursively traverses query tree to find actual relation
* id, and column that columnExpression refers to. If columnExpression is a
* non-relational or computed/derived expression, the function returns InvolidOid for
* relationId and NULL for column. The caller should provide parent query list from
* top of the tree to this particular Query's parent. This argument is used to look
* into CTEs that may be present in the query.
*/
void
FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *query,
Oid *relationId, Var **column)
{
Var *candidateColumn = NULL;
List *rangetableList = query->rtable;
Index rangeTableEntryIndex = 0;
@ -3337,6 +3365,9 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
Expr *strippedColumnExpression = (Expr *) strip_implicit_coercions(
(Node *) columnExpression);
*relationId = InvalidOid;
*column = NULL;
if (IsA(strippedColumnExpression, Var))
{
candidateColumn = (Var *) strippedColumnExpression;
@ -3350,17 +3381,11 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
{
candidateColumn = (Var *) fieldExpression;
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Only references to column fields are supported")));
}
}
if (candidateColumn == NULL)
{
return false;
return;
}
rangeTableEntryIndex = candidateColumn->varno - 1;
@ -3368,18 +3393,8 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
if (rangeTableEntry->rtekind == RTE_RELATION)
{
Oid relationId = rangeTableEntry->relid;
Var *partitionColumn = PartitionKey(relationId);
/* reference tables do not have partition column */
if (partitionColumn == NULL)
{
isPartitionColumn = false;
}
else if (candidateColumn->varattno == partitionColumn->varattno)
{
isPartitionColumn = true;
}
*relationId = rangeTableEntry->relid;
*column = candidateColumn;
}
else if (rangeTableEntry->rtekind == RTE_SUBQUERY)
{
@ -3387,9 +3402,12 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
List *targetEntryList = subquery->targetList;
AttrNumber targetEntryIndex = candidateColumn->varattno - 1;
TargetEntry *subqueryTargetEntry = list_nth(targetEntryList, targetEntryIndex);
Expr *subColumnExpression = subqueryTargetEntry->expr;
Expr *subqueryExpression = subqueryTargetEntry->expr;
isPartitionColumn = IsPartitionColumnRecursive(subqueryExpression, subquery);
/* append current query to parent query list */
parentQueryList = lappend(parentQueryList, query);
FindReferencedTableColumn(subColumnExpression, parentQueryList,
subquery, relationId, column);
}
else if (rangeTableEntry->rtekind == RTE_JOIN)
{
@ -3397,10 +3415,52 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
AttrNumber joinColumnIndex = candidateColumn->varattno - 1;
Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex);
isPartitionColumn = IsPartitionColumnRecursive(joinColumn, query);
/* parent query list stays the same since still in the same query boundary */
FindReferencedTableColumn(joinColumn, parentQueryList, query,
relationId, column);
}
else if (rangeTableEntry->rtekind == RTE_CTE)
{
int cteParentListIndex = list_length(parentQueryList) -
rangeTableEntry->ctelevelsup - 1;
Query *cteParentQuery = NULL;
List *cteList = NIL;
ListCell *cteListCell = NULL;
CommonTableExpr *cte = NULL;
return isPartitionColumn;
/*
* This should have been an error case, not marking it as error at the
* moment due to usage from IsPartitionColumn. Callers of that function
* do not have access to parent query list.
*/
if (cteParentListIndex >= 0)
{
cteParentQuery = list_nth(parentQueryList, cteParentListIndex);
cteList = cteParentQuery->cteList;
}
foreach(cteListCell, cteList)
{
CommonTableExpr *candidateCte = (CommonTableExpr *) lfirst(cteListCell);
if (strcmp(candidateCte->ctename, rangeTableEntry->ctename) == 0)
{
cte = candidateCte;
break;
}
}
if (cte != NULL)
{
Query *cteQuery = (Query *) cte->ctequery;
List *targetEntryList = cteQuery->targetList;
AttrNumber targetEntryIndex = candidateColumn->varattno - 1;
TargetEntry *targetEntry = list_nth(targetEntryList, targetEntryIndex);
parentQueryList = lappend(parentQueryList, query);
FindReferencedTableColumn(targetEntry->expr, parentQueryList,
cteQuery, relationId, column);
}
}
}
@ -3669,10 +3729,10 @@ SupportedLateralQuery(Query *parentQuery, Query *lateralQuery)
continue;
}
outerColumnIsPartitionColumn = IsPartitionColumnRecursive(outerQueryExpression,
parentQuery);
localColumnIsPartitionColumn = IsPartitionColumnRecursive(localQueryExpression,
lateralQuery);
outerColumnIsPartitionColumn = IsPartitionColumn(outerQueryExpression,
parentQuery);
localColumnIsPartitionColumn = IsPartitionColumn(localQueryExpression,
lateralQuery);
if (outerColumnIsPartitionColumn && localColumnIsPartitionColumn)
{
@ -3747,8 +3807,8 @@ JoinOnPartitionColumn(Query *query)
leftArgument = (Expr *) linitial(joinArgumentList);
rightArgument = (Expr *) lsecond(joinArgumentList);
isLeftColumnPartitionColumn = IsPartitionColumnRecursive(leftArgument, query);
isRightColumnPartitionColumn = IsPartitionColumnRecursive(rightArgument, query);
isLeftColumnPartitionColumn = IsPartitionColumn(leftArgument, query);
isRightColumnPartitionColumn = IsPartitionColumn(rightArgument, query);
if (isLeftColumnPartitionColumn && isRightColumnPartitionColumn)
{

View File

@ -16,6 +16,7 @@
#include "access/nbtree.h"
#include "catalog/pg_am.h"
#include "catalog/pg_class.h"
#include "commands/defrem.h"
#include "distributed/colocation_utils.h"
#include "distributed/metadata_cache.h"
@ -301,7 +302,7 @@ MultiPlanTree(Query *queryTree)
* elements.
*/
joinClauseList = JoinClauseList(whereClauseList);
tableEntryList = TableEntryList(rangeTableList);
tableEntryList = UsedTableEntryList(queryTree);
/* build the list of multi table nodes */
tableNodeList = MultiTableNodeList(tableEntryList, rangeTableList);
@ -1095,6 +1096,38 @@ TableEntryList(List *rangeTableList)
}
/*
* UsedTableEntryList returns list of relation range table entries
* that are referenced within the query. Unused entries due to query
* flattening or re-rewriting are ignored.
*/
List *
UsedTableEntryList(Query *query)
{
List *tableEntryList = NIL;
List *rangeTableList = query->rtable;
List *joinTreeTableIndexList = NIL;
ListCell *joinTreeTableIndexCell = NULL;
ExtractRangeTableIndexWalker((Node *) query->jointree, &joinTreeTableIndexList);
foreach(joinTreeTableIndexCell, joinTreeTableIndexList)
{
int joinTreeTableIndex = lfirst_int(joinTreeTableIndexCell);
RangeTblEntry *rangeTableEntry = rt_fetch(joinTreeTableIndex, rangeTableList);
if (rangeTableEntry->rtekind == RTE_RELATION)
{
TableEntry *tableEntry = (TableEntry *) palloc0(sizeof(TableEntry));
tableEntry->relationId = rangeTableEntry->relid;
tableEntry->rangeTableId = joinTreeTableIndex;
tableEntryList = lappend(tableEntryList, tableEntry);
}
}
return tableEntryList;
}
/*
* MultiTableNodeList builds a list of MultiTable nodes from the given table
* entry list. A multi table node represents one entry from the range table
@ -1592,7 +1625,8 @@ ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList)
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
if (rangeTableEntry->rtekind == RTE_RELATION)
if (rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind != RELKIND_VIEW)
{
(*rangeTableRelationList) = lappend(*rangeTableRelationList, rangeTableEntry);
}

View File

@ -496,7 +496,7 @@ AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval
{
TargetEntry *targetEntry = lfirst(targetEntryCell);
if (IsPartitionColumnRecursive(targetEntry->expr, subqery) &&
if (IsPartitionColumn(targetEntry->expr, subqery) &&
IsA(targetEntry->expr, Var))
{
targetPartitionColumnVar = (Var *) targetEntry->expr;
@ -641,12 +641,25 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
Oid selectPartitionColumnTableId = InvalidOid;
Oid targetRelationId = insertRte->relid;
char targetPartitionMethod = PartitionMethod(targetRelationId);
ListCell *rangeTableCell = NULL;
/* we only do this check for INSERT ... SELECT queries */
AssertArg(InsertSelectQuery(queryTree));
EnsureSchemaNode();
/* we do not expect to see a view in modify target */
foreach(rangeTableCell, queryTree->rtable)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
if (rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_VIEW)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot insert into view over distributed table")));
}
}
subquery = subqueryRte->subquery;
if (contain_volatile_functions((Node *) queryTree))
@ -654,9 +667,8 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning for the given "
"modification"),
errdetail(
"Volatile functions are not allowed in INSERT ... "
"SELECT queries")));
errdetail("Volatile functions are not allowed in "
"INSERT ... SELECT queries")));
}
/* we don't support LIMIT, OFFSET and WINDOW functions */
@ -821,6 +833,9 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse
AttrNumber originalAttrNo = get_attnum(insertRelationId,
targetEntry->resname);
TargetEntry *subqeryTargetEntry = NULL;
Oid originalRelationId = InvalidOid;
Var *originalColumn = NULL;
List *parentQueryList = NIL;
if (originalAttrNo != insertPartitionColumn->varattno)
{
@ -836,25 +851,36 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse
break;
}
/*
* Reference tables doesn't have a partition column, thus partition columns
* cannot match at all.
*/
if (PartitionMethod(subqeryTargetEntry->resorigtbl) == DISTRIBUTE_BY_NONE)
parentQueryList = list_make2(query, subquery);
FindReferencedTableColumn(subqeryTargetEntry->expr,
parentQueryList, subquery,
&originalRelationId,
&originalColumn);
if (originalRelationId == InvalidOid)
{
partitionColumnsMatch = false;
break;
}
if (!IsPartitionColumnRecursive(subqeryTargetEntry->expr, subquery))
/*
* Reference tables doesn't have a partition column, thus partition columns
* cannot match at all.
*/
if (PartitionMethod(originalRelationId) == DISTRIBUTE_BY_NONE)
{
partitionColumnsMatch = false;
break;
}
if (!IsPartitionColumn(subqeryTargetEntry->expr, subquery))
{
partitionColumnsMatch = false;
break;
}
partitionColumnsMatch = true;
*selectPartitionColumnTableId = subqeryTargetEntry->resorigtbl;
*selectPartitionColumnTableId = originalRelationId;
break;
}
}
@ -916,7 +942,7 @@ AddUninstantiatedPartitionRestriction(Query *originalQuery)
{
TargetEntry *targetEntry = lfirst(targetEntryCell);
if (IsPartitionColumnRecursive(targetEntry->expr, subquery) &&
if (IsPartitionColumn(targetEntry->expr, subquery) &&
IsA(targetEntry->expr, Var))
{
targetPartitionColumnVar = (Var *) targetEntry->expr;
@ -1077,6 +1103,13 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
}
queryTableCount++;
/* we do not expect to see a view in modify query */
if (rangeTableEntry->relkind == RELKIND_VIEW)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot modify views over distributed tables")));
}
}
else if (rangeTableEntry->rtekind == RTE_VALUES)
{

View File

@ -6664,6 +6664,11 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (strcmp(refname, rte->ctename) != 0)
printalias = true;
}
else if (rte->rtekind == RTE_SUBQUERY)
{
/* subquery requires alias too */
printalias = true;
}
if (printalias)
appendStringInfo(buf, " %s", quote_identifier(refname));

View File

@ -6739,6 +6739,11 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (strcmp(refname, rte->ctename) != 0)
printalias = true;
}
else if (rte->rtekind == RTE_SUBQUERY)
{
/* subquery requires alias too */
printalias = true;
}
if (printalias)
appendStringInfo(buf, " %s", quote_identifier(refname));

View File

@ -122,7 +122,9 @@ extern bool ExtractQueryWalker(Node *node, List **queryList);
extern bool LeafQuery(Query *queryTree);
extern List * PartitionColumnOpExpressionList(Query *query);
extern List * ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn);
extern bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query);
extern bool IsPartitionColumn(Expr *columnExpression, Query *query);
extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList,
Query *query, Oid *relationId, Var **column);
#endif /* MULTI_LOGICAL_OPTIMIZER_H */

View File

@ -199,6 +199,7 @@ extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList)
extern List * WhereClauseList(FromExpr *fromExpr);
extern List * QualifierList(FromExpr *fromExpr);
extern List * TableEntryList(List *rangeTableList);
extern List * UsedTableEntryList(Query *query);
extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList);
extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
extern List * pull_var_clause_default(Node *node);

View File

@ -187,3 +187,18 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl
DROP TABLE mx_table_test;
SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4;
CREATE TABLE lineitem_hash_part (like lineitem);
SELECT create_distributed_table('lineitem_hash_part', 'l_orderkey');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE orders_hash_part (like orders);
SELECT create_distributed_table('orders_hash_part', 'o_orderkey');
create_distributed_table
--------------------------
(1 row)

View File

@ -1479,10 +1479,29 @@ COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY raw_events_first, line 1: "103,103"
ROLLBACK;
-- Views does not work
-- selecting from views works
CREATE VIEW test_view AS SELECT * FROM raw_events_first;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(16, now(), 60, 600, 6000.1, 60000);
SELECT count(*) FROM raw_events_second;
count
-------
11
(1 row)
INSERT INTO raw_events_second SELECT * FROM test_view;
ERROR: cannot plan queries that include both regular and partitioned relations
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(17, now(), 60, 600, 6000.1, 60000);
INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6;
SELECT count(*) FROM raw_events_second;
count
-------
13
(1 row)
-- inserting into views does not
INSERT INTO test_view SELECT * FROM raw_events_second;
ERROR: cannot insert into view over distributed table
-- we need this in our next test
truncate raw_events_first;
SET client_min_messages TO DEBUG4;

View File

@ -0,0 +1,253 @@
--
-- MULTI_VIEW
--
-- This file contains test cases for view support. It verifies various
-- Citus features: simple selects, aggregates, joins, outer joins
-- router queries, single row inserts, multi row inserts via insert
-- into select, multi row insert via copy commands.
SELECT count(*) FROM lineitem_hash_part;
count
-------
12000
(1 row)
SELECT count(*) FROM orders_hash_part;
count
-------
2984
(1 row)
-- create a view for priority orders
CREATE VIEW priority_orders AS SELECT * FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM';
-- aggregate pushdown
SELECT o_orderpriority, count(*) FROM priority_orders GROUP BY 1 ORDER BY 2, 1;
o_orderpriority | count
-----------------+-------
2-HIGH | 593
1-URGENT | 603
(2 rows)
SELECT o_orderpriority, count(*) FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM' GROUP BY 1 ORDER BY 2,1;
o_orderpriority | count
-----------------+-------
2-HIGH | 593
1-URGENT | 603
(2 rows)
-- filters
SELECT o_orderpriority, count(*) as all, count(*) FILTER (WHERE o_orderstatus ='F') as fullfilled FROM priority_orders GROUP BY 1 ORDER BY 2, 1;
o_orderpriority | all | fullfilled
-----------------+-----+------------
2-HIGH | 593 | 271
1-URGENT | 603 | 280
(2 rows)
-- having
SELECT o_orderdate, count(*) from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc;
o_orderdate | count
-------------+-------
08-20-1996 | 5
10-10-1994 | 4
05-05-1994 | 4
04-07-1994 | 4
03-17-1993 | 4
(5 rows)
-- having with filters
SELECT o_orderdate, count(*) as all, count(*) FILTER(WHERE o_orderstatus = 'F') from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc;
o_orderdate | all | count
-------------+-----+-------
08-20-1996 | 5 | 0
10-10-1994 | 4 | 4
05-05-1994 | 4 | 4
04-07-1994 | 4 | 4
03-17-1993 | 4 | 4
(5 rows)
-- limit
SELECT o_orderkey, o_totalprice from orders_hash_part order by 2 desc, 1 asc limit 5 ;
o_orderkey | o_totalprice
------------+--------------
4421 | 401055.62
10209 | 400191.77
11142 | 395039.05
14179 | 384265.43
11296 | 378166.33
(5 rows)
SELECT o_orderkey, o_totalprice from priority_orders order by 2 desc, 1 asc limit 1 ;
o_orderkey | o_totalprice
------------+--------------
14179 | 384265.43
(1 row)
CREATE VIEW priority_lineitem AS SELECT li.* FROM lineitem_hash_part li JOIN priority_orders ON (l_orderkey = o_orderkey);
SELECT l_orderkey, count(*) FROM priority_lineitem GROUP BY 1 ORDER BY 2 DESC, 1 LIMIT 5;
l_orderkey | count
------------+-------
7 | 7
225 | 7
226 | 7
322 | 7
326 | 7
(5 rows)
CREATE VIEW air_shipped_lineitems AS SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR';
-- join between view and table
SELECT count(*) FROM orders_hash_part join air_shipped_lineitems ON (o_orderkey = l_orderkey);
count
-------
1706
(1 row)
-- join between views
SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
count
-------
700
(1 row)
-- count distinct on partition column is not supported
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
-- count distinct on partition column is supported on router queries
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems
ON (o_orderkey = l_orderkey)
WHERE (o_orderkey = 231);
count
-------
1
(1 row)
-- select distinct on router joins of views also works
SELECT distinct(o_orderkey) FROM priority_orders join air_shipped_lineitems
ON (o_orderkey = l_orderkey)
WHERE (o_orderkey = 231);
o_orderkey
------------
231
(1 row)
-- left join support depends on flattening of the query
-- following query fails since the inner part is kept as subquery
SELECT * FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries in outer joins are not supported
-- however, this works
SELECT count(*) FROM priority_orders left join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
count
-------
700
(1 row)
-- view at the inner side of is not supported
SELECT count(*) FROM priority_orders right join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries in outer joins are not supported
-- but view at the outer side is. This is essentially the same as a left join with arguments reversed.
SELECT count(*) FROM lineitem_hash_part right join priority_orders ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
count
-------
700
(1 row)
-- left join on router query is supported
SELECT o_orderkey, l_linenumber FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey)
WHERE o_orderkey = 2;
o_orderkey | l_linenumber
------------+--------------
2 |
(1 row)
-- repartition query on view join
-- it passes planning, fails at execution stage
SELECT * FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey);
ERROR: cannot use real time executor with repartition jobs
HINT: Set citus.task_executor_type to "task-tracker".
SET citus.task_executor_type to "task-tracker";
SELECT count(*) FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey);
count
-------
192
(1 row)
SET citus.task_executor_type to DEFAULT;
-- insert into... select works with views
CREATE TABLE temp_lineitem(LIKE lineitem_hash_part);
SELECT create_distributed_table('temp_lineitem', 'l_orderkey', 'hash', 'lineitem_hash_part');
create_distributed_table
--------------------------
(1 row)
INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems;
SELECT count(*) FROM temp_lineitem;
count
-------
1706
(1 row)
-- following is a where false query, should not be inserting anything
INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems WHERE l_shipmode = 'MAIL';
SELECT count(*) FROM temp_lineitem;
count
-------
1706
(1 row)
-- modifying views is disallowed
INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem;
ERROR: cannot insert into view over distributed table
SET citus.task_executor_type to "task-tracker";
-- single view repartition subqueries are not supported
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM air_shipped_lineitems GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries without group by clause are not supported yet
-- logically same query without a view works fine
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM lineitem_hash_part WHERE l_shipmode = 'AIR' GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
l_suppkey | count
-----------+-------
7680 | 4
160 | 3
1042 | 3
1318 | 3
5873 | 3
(5 rows)
-- when a view is replaced by actual query it still fails
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM (SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR') asi
GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries without group by clause are not supported yet
SET citus.task_executor_type to DEFAULT;
-- create a view with aggregate
CREATE VIEW lineitems_by_shipping_method AS
SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1;
-- following will fail due to non-flattening of subquery due to GROUP BY
SELECT * FROM lineitems_by_shipping_method;
ERROR: bogus varno: 0
-- create a view with group by on partition column
CREATE VIEW lineitems_by_orderkey AS
SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1;
-- this will also fail due to same reason
SELECT * FROM lineitems_by_orderkey;
ERROR: bogus varno: 0
-- however it would work if it is made router plannable
SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100;
l_orderkey | count
------------+-------
100 | 5
(1 row)
DROP TABLE temp_lineitem CASCADE;

View File

@ -1,5 +1,5 @@
--
-- MULTI_STAGE_DATA
-- MULTI_LOAD_DATA
--
-- Tests for loading data in a distributed cluster. Please note that the number
-- of shards uploaded depends on two config values: citus.shard_replication_factor and
@ -20,3 +20,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 290000;
\copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|'
\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'

View File

@ -56,6 +56,7 @@ test: multi_dropped_column_aliases
test: multi_binary_master_copy_format
test: multi_prepare_sql multi_prepare_plsql
test: multi_sql_function
test: multi_view
# ----------
# Parallel TPC-H tests to check our distributed execution behavior

View File

@ -1,5 +1,5 @@
--
-- MULTI_STAGE_DATA
-- MULTI_LOAD_DATA
--
-- Tests for loading data in a distributed cluster. Please note that the number
-- of shards uploaded depends on two config values: citus.shard_replication_factor and
@ -16,3 +16,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 290000;
\copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|'
\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'

View File

@ -136,3 +136,11 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl
DROP TABLE mx_table_test;
SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4;
CREATE TABLE lineitem_hash_part (like lineitem);
SELECT create_distributed_table('lineitem_hash_part', 'l_orderkey');
CREATE TABLE orders_hash_part (like orders);
SELECT create_distributed_table('orders_hash_part', 'o_orderkey');

View File

@ -744,9 +744,19 @@ COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
\.
ROLLBACK;
-- Views does not work
-- selecting from views works
CREATE VIEW test_view AS SELECT * FROM raw_events_first;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(16, now(), 60, 600, 6000.1, 60000);
SELECT count(*) FROM raw_events_second;
INSERT INTO raw_events_second SELECT * FROM test_view;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(17, now(), 60, 600, 6000.1, 60000);
INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6;
SELECT count(*) FROM raw_events_second;
-- inserting into views does not
INSERT INTO test_view SELECT * FROM raw_events_second;
-- we need this in our next test
truncate raw_events_first;

View File

@ -0,0 +1,139 @@
--
-- MULTI_VIEW
--
-- This file contains test cases for view support. It verifies various
-- Citus features: simple selects, aggregates, joins, outer joins
-- router queries, single row inserts, multi row inserts via insert
-- into select, multi row insert via copy commands.
SELECT count(*) FROM lineitem_hash_part;
SELECT count(*) FROM orders_hash_part;
-- create a view for priority orders
CREATE VIEW priority_orders AS SELECT * FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM';
-- aggregate pushdown
SELECT o_orderpriority, count(*) FROM priority_orders GROUP BY 1 ORDER BY 2, 1;
SELECT o_orderpriority, count(*) FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM' GROUP BY 1 ORDER BY 2,1;
-- filters
SELECT o_orderpriority, count(*) as all, count(*) FILTER (WHERE o_orderstatus ='F') as fullfilled FROM priority_orders GROUP BY 1 ORDER BY 2, 1;
-- having
SELECT o_orderdate, count(*) from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc;
-- having with filters
SELECT o_orderdate, count(*) as all, count(*) FILTER(WHERE o_orderstatus = 'F') from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc;
-- limit
SELECT o_orderkey, o_totalprice from orders_hash_part order by 2 desc, 1 asc limit 5 ;
SELECT o_orderkey, o_totalprice from priority_orders order by 2 desc, 1 asc limit 1 ;
CREATE VIEW priority_lineitem AS SELECT li.* FROM lineitem_hash_part li JOIN priority_orders ON (l_orderkey = o_orderkey);
SELECT l_orderkey, count(*) FROM priority_lineitem GROUP BY 1 ORDER BY 2 DESC, 1 LIMIT 5;
CREATE VIEW air_shipped_lineitems AS SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR';
-- join between view and table
SELECT count(*) FROM orders_hash_part join air_shipped_lineitems ON (o_orderkey = l_orderkey);
-- join between views
SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
-- count distinct on partition column is not supported
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
-- count distinct on partition column is supported on router queries
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems
ON (o_orderkey = l_orderkey)
WHERE (o_orderkey = 231);
-- select distinct on router joins of views also works
SELECT distinct(o_orderkey) FROM priority_orders join air_shipped_lineitems
ON (o_orderkey = l_orderkey)
WHERE (o_orderkey = 231);
-- left join support depends on flattening of the query
-- following query fails since the inner part is kept as subquery
SELECT * FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey);
-- however, this works
SELECT count(*) FROM priority_orders left join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
-- view at the inner side of is not supported
SELECT count(*) FROM priority_orders right join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
-- but view at the outer side is. This is essentially the same as a left join with arguments reversed.
SELECT count(*) FROM lineitem_hash_part right join priority_orders ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
-- left join on router query is supported
SELECT o_orderkey, l_linenumber FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey)
WHERE o_orderkey = 2;
-- repartition query on view join
-- it passes planning, fails at execution stage
SELECT * FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey);
SET citus.task_executor_type to "task-tracker";
SELECT count(*) FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey);
SET citus.task_executor_type to DEFAULT;
-- insert into... select works with views
CREATE TABLE temp_lineitem(LIKE lineitem_hash_part);
SELECT create_distributed_table('temp_lineitem', 'l_orderkey', 'hash', 'lineitem_hash_part');
INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems;
SELECT count(*) FROM temp_lineitem;
-- following is a where false query, should not be inserting anything
INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems WHERE l_shipmode = 'MAIL';
SELECT count(*) FROM temp_lineitem;
-- modifying views is disallowed
INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem;
SET citus.task_executor_type to "task-tracker";
-- single view repartition subqueries are not supported
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM air_shipped_lineitems GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
-- logically same query without a view works fine
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM lineitem_hash_part WHERE l_shipmode = 'AIR' GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
-- when a view is replaced by actual query it still fails
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM (SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR') asi
GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
SET citus.task_executor_type to DEFAULT;
-- create a view with aggregate
CREATE VIEW lineitems_by_shipping_method AS
SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1;
-- following will fail due to non-flattening of subquery due to GROUP BY
SELECT * FROM lineitems_by_shipping_method;
-- create a view with group by on partition column
CREATE VIEW lineitems_by_orderkey AS
SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1;
-- this will also fail due to same reason
SELECT * FROM lineitems_by_orderkey;
-- however it would work if it is made router plannable
SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100;
DROP TABLE temp_lineitem CASCADE;