Merge pull request #1024 from citusdata/feature/442_view_support

Add view support for select queries
pull/1105/head
Murat Tuncer 2017-01-13 09:02:07 +02:00 committed by GitHub
commit 0af5d553d1
17 changed files with 661 additions and 68 deletions

View File

@ -3280,7 +3280,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Expr *targetExpression = targetEntry->expr;
bool isPartitionColumn = IsPartitionColumnRecursive(targetExpression, query);
bool isPartitionColumn = IsPartitionColumn(targetExpression, query);
if (isPartitionColumn)
{
FieldSelect *compositeField = CompositeFieldRecursive(targetExpression,
@ -3312,24 +3312,52 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
/*
* IsPartitionColumnRecursive recursively checks if the given column is a partition
* column. If a column is referenced from a regular table, we directly check if
* it is a partition column. If a column is referenced from a subquery, then we
* recursively check that subquery until we reach the source of that column, and
* verify this column is a partition column. If a column is referenced from a
* join range table entry, then we resolve which join column it refers and
* recursively check this column with the same query.
* IsPartitionColumn returns true if the given column is a partition column.
* The function uses FindReferencedTableColumn to find the original relation
* id and column that the column expression refers to. It then checks whether
* that column is a partition column of the relation.
*
* Note that if the given expression is a field of a composite type, then this
* function checks if this composite column is a partition column.
*
* Also, the function returns always false for reference tables given that reference
* tables do not have partition column.
* Also, the function returns always false for reference tables given that
* reference tables do not have partition column. The function does not
* support queries with CTEs, it would return false if columnExpression
* refers to a column returned by a CTE.
*/
bool
IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
IsPartitionColumn(Expr *columnExpression, Query *query)
{
bool isPartitionColumn = false;
Oid relationId = InvalidOid;
Var *column = NULL;
FindReferencedTableColumn(columnExpression, NIL, query, &relationId, &column);
if (relationId != InvalidOid && column != NULL)
{
Var *partitionColumn = PartitionKey(relationId);
/* not all distributed tables have partition column */
if (partitionColumn != NULL && column->varattno == partitionColumn->varattno)
{
isPartitionColumn = true;
}
}
return isPartitionColumn;
}
/*
* FindReferencedTableColumn recursively traverses query tree to find actual relation
* id, and column that columnExpression refers to. If columnExpression is a
* non-relational or computed/derived expression, the function returns InvolidOid for
* relationId and NULL for column. The caller should provide parent query list from
* top of the tree to this particular Query's parent. This argument is used to look
* into CTEs that may be present in the query.
*/
void
FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *query,
Oid *relationId, Var **column)
{
Var *candidateColumn = NULL;
List *rangetableList = query->rtable;
Index rangeTableEntryIndex = 0;
@ -3337,6 +3365,9 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
Expr *strippedColumnExpression = (Expr *) strip_implicit_coercions(
(Node *) columnExpression);
*relationId = InvalidOid;
*column = NULL;
if (IsA(strippedColumnExpression, Var))
{
candidateColumn = (Var *) strippedColumnExpression;
@ -3350,17 +3381,11 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
{
candidateColumn = (Var *) fieldExpression;
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Only references to column fields are supported")));
}
}
if (candidateColumn == NULL)
{
return false;
return;
}
rangeTableEntryIndex = candidateColumn->varno - 1;
@ -3368,18 +3393,8 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
if (rangeTableEntry->rtekind == RTE_RELATION)
{
Oid relationId = rangeTableEntry->relid;
Var *partitionColumn = PartitionKey(relationId);
/* reference tables do not have partition column */
if (partitionColumn == NULL)
{
isPartitionColumn = false;
}
else if (candidateColumn->varattno == partitionColumn->varattno)
{
isPartitionColumn = true;
}
*relationId = rangeTableEntry->relid;
*column = candidateColumn;
}
else if (rangeTableEntry->rtekind == RTE_SUBQUERY)
{
@ -3387,9 +3402,12 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
List *targetEntryList = subquery->targetList;
AttrNumber targetEntryIndex = candidateColumn->varattno - 1;
TargetEntry *subqueryTargetEntry = list_nth(targetEntryList, targetEntryIndex);
Expr *subColumnExpression = subqueryTargetEntry->expr;
Expr *subqueryExpression = subqueryTargetEntry->expr;
isPartitionColumn = IsPartitionColumnRecursive(subqueryExpression, subquery);
/* append current query to parent query list */
parentQueryList = lappend(parentQueryList, query);
FindReferencedTableColumn(subColumnExpression, parentQueryList,
subquery, relationId, column);
}
else if (rangeTableEntry->rtekind == RTE_JOIN)
{
@ -3397,10 +3415,52 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
AttrNumber joinColumnIndex = candidateColumn->varattno - 1;
Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex);
isPartitionColumn = IsPartitionColumnRecursive(joinColumn, query);
/* parent query list stays the same since still in the same query boundary */
FindReferencedTableColumn(joinColumn, parentQueryList, query,
relationId, column);
}
else if (rangeTableEntry->rtekind == RTE_CTE)
{
int cteParentListIndex = list_length(parentQueryList) -
rangeTableEntry->ctelevelsup - 1;
Query *cteParentQuery = NULL;
List *cteList = NIL;
ListCell *cteListCell = NULL;
CommonTableExpr *cte = NULL;
return isPartitionColumn;
/*
* This should have been an error case, not marking it as error at the
* moment due to usage from IsPartitionColumn. Callers of that function
* do not have access to parent query list.
*/
if (cteParentListIndex >= 0)
{
cteParentQuery = list_nth(parentQueryList, cteParentListIndex);
cteList = cteParentQuery->cteList;
}
foreach(cteListCell, cteList)
{
CommonTableExpr *candidateCte = (CommonTableExpr *) lfirst(cteListCell);
if (strcmp(candidateCte->ctename, rangeTableEntry->ctename) == 0)
{
cte = candidateCte;
break;
}
}
if (cte != NULL)
{
Query *cteQuery = (Query *) cte->ctequery;
List *targetEntryList = cteQuery->targetList;
AttrNumber targetEntryIndex = candidateColumn->varattno - 1;
TargetEntry *targetEntry = list_nth(targetEntryList, targetEntryIndex);
parentQueryList = lappend(parentQueryList, query);
FindReferencedTableColumn(targetEntry->expr, parentQueryList,
cteQuery, relationId, column);
}
}
}
@ -3669,10 +3729,10 @@ SupportedLateralQuery(Query *parentQuery, Query *lateralQuery)
continue;
}
outerColumnIsPartitionColumn = IsPartitionColumnRecursive(outerQueryExpression,
parentQuery);
localColumnIsPartitionColumn = IsPartitionColumnRecursive(localQueryExpression,
lateralQuery);
outerColumnIsPartitionColumn = IsPartitionColumn(outerQueryExpression,
parentQuery);
localColumnIsPartitionColumn = IsPartitionColumn(localQueryExpression,
lateralQuery);
if (outerColumnIsPartitionColumn && localColumnIsPartitionColumn)
{
@ -3747,8 +3807,8 @@ JoinOnPartitionColumn(Query *query)
leftArgument = (Expr *) linitial(joinArgumentList);
rightArgument = (Expr *) lsecond(joinArgumentList);
isLeftColumnPartitionColumn = IsPartitionColumnRecursive(leftArgument, query);
isRightColumnPartitionColumn = IsPartitionColumnRecursive(rightArgument, query);
isLeftColumnPartitionColumn = IsPartitionColumn(leftArgument, query);
isRightColumnPartitionColumn = IsPartitionColumn(rightArgument, query);
if (isLeftColumnPartitionColumn && isRightColumnPartitionColumn)
{

View File

@ -16,6 +16,7 @@
#include "access/nbtree.h"
#include "catalog/pg_am.h"
#include "catalog/pg_class.h"
#include "commands/defrem.h"
#include "distributed/colocation_utils.h"
#include "distributed/metadata_cache.h"
@ -301,7 +302,7 @@ MultiPlanTree(Query *queryTree)
* elements.
*/
joinClauseList = JoinClauseList(whereClauseList);
tableEntryList = TableEntryList(rangeTableList);
tableEntryList = UsedTableEntryList(queryTree);
/* build the list of multi table nodes */
tableNodeList = MultiTableNodeList(tableEntryList, rangeTableList);
@ -1095,6 +1096,38 @@ TableEntryList(List *rangeTableList)
}
/*
* UsedTableEntryList returns list of relation range table entries
* that are referenced within the query. Unused entries due to query
* flattening or re-rewriting are ignored.
*/
List *
UsedTableEntryList(Query *query)
{
List *tableEntryList = NIL;
List *rangeTableList = query->rtable;
List *joinTreeTableIndexList = NIL;
ListCell *joinTreeTableIndexCell = NULL;
ExtractRangeTableIndexWalker((Node *) query->jointree, &joinTreeTableIndexList);
foreach(joinTreeTableIndexCell, joinTreeTableIndexList)
{
int joinTreeTableIndex = lfirst_int(joinTreeTableIndexCell);
RangeTblEntry *rangeTableEntry = rt_fetch(joinTreeTableIndex, rangeTableList);
if (rangeTableEntry->rtekind == RTE_RELATION)
{
TableEntry *tableEntry = (TableEntry *) palloc0(sizeof(TableEntry));
tableEntry->relationId = rangeTableEntry->relid;
tableEntry->rangeTableId = joinTreeTableIndex;
tableEntryList = lappend(tableEntryList, tableEntry);
}
}
return tableEntryList;
}
/*
* MultiTableNodeList builds a list of MultiTable nodes from the given table
* entry list. A multi table node represents one entry from the range table
@ -1592,7 +1625,8 @@ ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList)
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
if (rangeTableEntry->rtekind == RTE_RELATION)
if (rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind != RELKIND_VIEW)
{
(*rangeTableRelationList) = lappend(*rangeTableRelationList, rangeTableEntry);
}

View File

@ -1515,7 +1515,6 @@ UpdateColumnAttributes(Var *column, List *rangeTableList, List *dependedJobList)
static Index
NewTableId(Index originalTableId, List *rangeTableList)
{
Index newTableId = 0;
Index rangeTableIndex = 1;
ListCell *rangeTableCell = NULL;
@ -1530,14 +1529,15 @@ NewTableId(Index originalTableId, List *rangeTableList)
listMember = list_member_int(originalTableIdList, originalTableId);
if (listMember)
{
newTableId = rangeTableIndex;
break;
return rangeTableIndex;
}
rangeTableIndex++;
}
return newTableId;
ereport(ERROR, (errmsg("Unrecognized range table id %d", (int) originalTableId)));
return 0;
}

View File

@ -496,7 +496,7 @@ AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval
{
TargetEntry *targetEntry = lfirst(targetEntryCell);
if (IsPartitionColumnRecursive(targetEntry->expr, subqery) &&
if (IsPartitionColumn(targetEntry->expr, subqery) &&
IsA(targetEntry->expr, Var))
{
targetPartitionColumnVar = (Var *) targetEntry->expr;
@ -641,12 +641,25 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
Oid selectPartitionColumnTableId = InvalidOid;
Oid targetRelationId = insertRte->relid;
char targetPartitionMethod = PartitionMethod(targetRelationId);
ListCell *rangeTableCell = NULL;
/* we only do this check for INSERT ... SELECT queries */
AssertArg(InsertSelectQuery(queryTree));
EnsureSchemaNode();
/* we do not expect to see a view in modify target */
foreach(rangeTableCell, queryTree->rtable)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
if (rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_VIEW)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot insert into view over distributed table")));
}
}
subquery = subqueryRte->subquery;
if (contain_volatile_functions((Node *) queryTree))
@ -654,9 +667,8 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning for the given "
"modification"),
errdetail(
"Volatile functions are not allowed in INSERT ... "
"SELECT queries")));
errdetail("Volatile functions are not allowed in "
"INSERT ... SELECT queries")));
}
/* we don't support LIMIT, OFFSET and WINDOW functions */
@ -821,6 +833,9 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse
AttrNumber originalAttrNo = get_attnum(insertRelationId,
targetEntry->resname);
TargetEntry *subqeryTargetEntry = NULL;
Oid originalRelationId = InvalidOid;
Var *originalColumn = NULL;
List *parentQueryList = NIL;
if (originalAttrNo != insertPartitionColumn->varattno)
{
@ -836,25 +851,36 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse
break;
}
/*
* Reference tables doesn't have a partition column, thus partition columns
* cannot match at all.
*/
if (PartitionMethod(subqeryTargetEntry->resorigtbl) == DISTRIBUTE_BY_NONE)
parentQueryList = list_make2(query, subquery);
FindReferencedTableColumn(subqeryTargetEntry->expr,
parentQueryList, subquery,
&originalRelationId,
&originalColumn);
if (originalRelationId == InvalidOid)
{
partitionColumnsMatch = false;
break;
}
if (!IsPartitionColumnRecursive(subqeryTargetEntry->expr, subquery))
/*
* Reference tables doesn't have a partition column, thus partition columns
* cannot match at all.
*/
if (PartitionMethod(originalRelationId) == DISTRIBUTE_BY_NONE)
{
partitionColumnsMatch = false;
break;
}
if (!IsPartitionColumn(subqeryTargetEntry->expr, subquery))
{
partitionColumnsMatch = false;
break;
}
partitionColumnsMatch = true;
*selectPartitionColumnTableId = subqeryTargetEntry->resorigtbl;
*selectPartitionColumnTableId = originalRelationId;
break;
}
}
@ -916,7 +942,7 @@ AddUninstantiatedPartitionRestriction(Query *originalQuery)
{
TargetEntry *targetEntry = lfirst(targetEntryCell);
if (IsPartitionColumnRecursive(targetEntry->expr, subquery) &&
if (IsPartitionColumn(targetEntry->expr, subquery) &&
IsA(targetEntry->expr, Var))
{
targetPartitionColumnVar = (Var *) targetEntry->expr;
@ -1077,6 +1103,13 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
}
queryTableCount++;
/* we do not expect to see a view in modify query */
if (rangeTableEntry->relkind == RELKIND_VIEW)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot modify views over distributed tables")));
}
}
else if (rangeTableEntry->rtekind == RTE_VALUES)
{

View File

@ -6664,6 +6664,11 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (strcmp(refname, rte->ctename) != 0)
printalias = true;
}
else if (rte->rtekind == RTE_SUBQUERY)
{
/* subquery requires alias too */
printalias = true;
}
if (printalias)
appendStringInfo(buf, " %s", quote_identifier(refname));

View File

@ -6739,6 +6739,11 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (strcmp(refname, rte->ctename) != 0)
printalias = true;
}
else if (rte->rtekind == RTE_SUBQUERY)
{
/* subquery requires alias too */
printalias = true;
}
if (printalias)
appendStringInfo(buf, " %s", quote_identifier(refname));

View File

@ -122,7 +122,9 @@ extern bool ExtractQueryWalker(Node *node, List **queryList);
extern bool LeafQuery(Query *queryTree);
extern List * PartitionColumnOpExpressionList(Query *query);
extern List * ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn);
extern bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query);
extern bool IsPartitionColumn(Expr *columnExpression, Query *query);
extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList,
Query *query, Oid *relationId, Var **column);
#endif /* MULTI_LOGICAL_OPTIMIZER_H */

View File

@ -199,6 +199,7 @@ extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList)
extern List * WhereClauseList(FromExpr *fromExpr);
extern List * QualifierList(FromExpr *fromExpr);
extern List * TableEntryList(List *rangeTableList);
extern List * UsedTableEntryList(Query *query);
extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList);
extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
extern List * pull_var_clause_default(Node *node);

View File

@ -187,3 +187,18 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl
DROP TABLE mx_table_test;
SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4;
CREATE TABLE lineitem_hash_part (like lineitem);
SELECT create_distributed_table('lineitem_hash_part', 'l_orderkey');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE orders_hash_part (like orders);
SELECT create_distributed_table('orders_hash_part', 'o_orderkey');
create_distributed_table
--------------------------
(1 row)

View File

@ -1479,10 +1479,29 @@ COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY raw_events_first, line 1: "103,103"
ROLLBACK;
-- Views does not work
-- selecting from views works
CREATE VIEW test_view AS SELECT * FROM raw_events_first;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(16, now(), 60, 600, 6000.1, 60000);
SELECT count(*) FROM raw_events_second;
count
-------
11
(1 row)
INSERT INTO raw_events_second SELECT * FROM test_view;
ERROR: cannot plan queries that include both regular and partitioned relations
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(17, now(), 60, 600, 6000.1, 60000);
INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6;
SELECT count(*) FROM raw_events_second;
count
-------
13
(1 row)
-- inserting into views does not
INSERT INTO test_view SELECT * FROM raw_events_second;
ERROR: cannot insert into view over distributed table
-- we need this in our next test
truncate raw_events_first;
SET client_min_messages TO DEBUG4;

View File

@ -0,0 +1,253 @@
--
-- MULTI_VIEW
--
-- This file contains test cases for view support. It verifies various
-- Citus features: simple selects, aggregates, joins, outer joins
-- router queries, single row inserts, multi row inserts via insert
-- into select, multi row insert via copy commands.
SELECT count(*) FROM lineitem_hash_part;
count
-------
12000
(1 row)
SELECT count(*) FROM orders_hash_part;
count
-------
2984
(1 row)
-- create a view for priority orders
CREATE VIEW priority_orders AS SELECT * FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM';
-- aggregate pushdown
SELECT o_orderpriority, count(*) FROM priority_orders GROUP BY 1 ORDER BY 2, 1;
o_orderpriority | count
-----------------+-------
2-HIGH | 593
1-URGENT | 603
(2 rows)
SELECT o_orderpriority, count(*) FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM' GROUP BY 1 ORDER BY 2,1;
o_orderpriority | count
-----------------+-------
2-HIGH | 593
1-URGENT | 603
(2 rows)
-- filters
SELECT o_orderpriority, count(*) as all, count(*) FILTER (WHERE o_orderstatus ='F') as fullfilled FROM priority_orders GROUP BY 1 ORDER BY 2, 1;
o_orderpriority | all | fullfilled
-----------------+-----+------------
2-HIGH | 593 | 271
1-URGENT | 603 | 280
(2 rows)
-- having
SELECT o_orderdate, count(*) from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc;
o_orderdate | count
-------------+-------
08-20-1996 | 5
10-10-1994 | 4
05-05-1994 | 4
04-07-1994 | 4
03-17-1993 | 4
(5 rows)
-- having with filters
SELECT o_orderdate, count(*) as all, count(*) FILTER(WHERE o_orderstatus = 'F') from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc;
o_orderdate | all | count
-------------+-----+-------
08-20-1996 | 5 | 0
10-10-1994 | 4 | 4
05-05-1994 | 4 | 4
04-07-1994 | 4 | 4
03-17-1993 | 4 | 4
(5 rows)
-- limit
SELECT o_orderkey, o_totalprice from orders_hash_part order by 2 desc, 1 asc limit 5 ;
o_orderkey | o_totalprice
------------+--------------
4421 | 401055.62
10209 | 400191.77
11142 | 395039.05
14179 | 384265.43
11296 | 378166.33
(5 rows)
SELECT o_orderkey, o_totalprice from priority_orders order by 2 desc, 1 asc limit 1 ;
o_orderkey | o_totalprice
------------+--------------
14179 | 384265.43
(1 row)
CREATE VIEW priority_lineitem AS SELECT li.* FROM lineitem_hash_part li JOIN priority_orders ON (l_orderkey = o_orderkey);
SELECT l_orderkey, count(*) FROM priority_lineitem GROUP BY 1 ORDER BY 2 DESC, 1 LIMIT 5;
l_orderkey | count
------------+-------
7 | 7
225 | 7
226 | 7
322 | 7
326 | 7
(5 rows)
CREATE VIEW air_shipped_lineitems AS SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR';
-- join between view and table
SELECT count(*) FROM orders_hash_part join air_shipped_lineitems ON (o_orderkey = l_orderkey);
count
-------
1706
(1 row)
-- join between views
SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
count
-------
700
(1 row)
-- count distinct on partition column is not supported
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
-- count distinct on partition column is supported on router queries
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems
ON (o_orderkey = l_orderkey)
WHERE (o_orderkey = 231);
count
-------
1
(1 row)
-- select distinct on router joins of views also works
SELECT distinct(o_orderkey) FROM priority_orders join air_shipped_lineitems
ON (o_orderkey = l_orderkey)
WHERE (o_orderkey = 231);
o_orderkey
------------
231
(1 row)
-- left join support depends on flattening of the query
-- following query fails since the inner part is kept as subquery
SELECT * FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries in outer joins are not supported
-- however, this works
SELECT count(*) FROM priority_orders left join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
count
-------
700
(1 row)
-- view at the inner side of is not supported
SELECT count(*) FROM priority_orders right join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries in outer joins are not supported
-- but view at the outer side is. This is essentially the same as a left join with arguments reversed.
SELECT count(*) FROM lineitem_hash_part right join priority_orders ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
count
-------
700
(1 row)
-- left join on router query is supported
SELECT o_orderkey, l_linenumber FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey)
WHERE o_orderkey = 2;
o_orderkey | l_linenumber
------------+--------------
2 |
(1 row)
-- repartition query on view join
-- it passes planning, fails at execution stage
SELECT * FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey);
ERROR: cannot use real time executor with repartition jobs
HINT: Set citus.task_executor_type to "task-tracker".
SET citus.task_executor_type to "task-tracker";
SELECT count(*) FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey);
count
-------
192
(1 row)
SET citus.task_executor_type to DEFAULT;
-- insert into... select works with views
CREATE TABLE temp_lineitem(LIKE lineitem_hash_part);
SELECT create_distributed_table('temp_lineitem', 'l_orderkey', 'hash', 'lineitem_hash_part');
create_distributed_table
--------------------------
(1 row)
INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems;
SELECT count(*) FROM temp_lineitem;
count
-------
1706
(1 row)
-- following is a where false query, should not be inserting anything
INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems WHERE l_shipmode = 'MAIL';
SELECT count(*) FROM temp_lineitem;
count
-------
1706
(1 row)
-- modifying views is disallowed
INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem;
ERROR: cannot insert into view over distributed table
SET citus.task_executor_type to "task-tracker";
-- single view repartition subqueries are not supported
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM air_shipped_lineitems GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries without group by clause are not supported yet
-- logically same query without a view works fine
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM lineitem_hash_part WHERE l_shipmode = 'AIR' GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
l_suppkey | count
-----------+-------
7680 | 4
160 | 3
1042 | 3
1318 | 3
5873 | 3
(5 rows)
-- when a view is replaced by actual query it still fails
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM (SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR') asi
GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries without group by clause are not supported yet
SET citus.task_executor_type to DEFAULT;
-- create a view with aggregate
CREATE VIEW lineitems_by_shipping_method AS
SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1;
-- following will fail due to non-flattening of subquery due to GROUP BY
SELECT * FROM lineitems_by_shipping_method;
ERROR: Unrecognized range table id 1
-- create a view with group by on partition column
CREATE VIEW lineitems_by_orderkey AS
SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1;
-- this will also fail due to same reason
SELECT * FROM lineitems_by_orderkey;
ERROR: Unrecognized range table id 1
-- however it would work if it is made router plannable
SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100;
l_orderkey | count
------------+-------
100 | 5
(1 row)
DROP TABLE temp_lineitem CASCADE;

View File

@ -1,5 +1,5 @@
--
-- MULTI_STAGE_DATA
-- MULTI_LOAD_DATA
--
-- Tests for loading data in a distributed cluster. Please note that the number
-- of shards uploaded depends on two config values: citus.shard_replication_factor and
@ -20,3 +20,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 290000;
\copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|'
\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'

View File

@ -56,6 +56,7 @@ test: multi_dropped_column_aliases
test: multi_binary_master_copy_format
test: multi_prepare_sql multi_prepare_plsql
test: multi_sql_function
test: multi_view
# ----------
# Parallel TPC-H tests to check our distributed execution behavior

View File

@ -1,5 +1,5 @@
--
-- MULTI_STAGE_DATA
-- MULTI_LOAD_DATA
--
-- Tests for loading data in a distributed cluster. Please note that the number
-- of shards uploaded depends on two config values: citus.shard_replication_factor and
@ -16,3 +16,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 290000;
\copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|'
\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_hash_part FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'

View File

@ -136,3 +136,11 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl
DROP TABLE mx_table_test;
SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4;
CREATE TABLE lineitem_hash_part (like lineitem);
SELECT create_distributed_table('lineitem_hash_part', 'l_orderkey');
CREATE TABLE orders_hash_part (like orders);
SELECT create_distributed_table('orders_hash_part', 'o_orderkey');

View File

@ -744,9 +744,19 @@ COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
\.
ROLLBACK;
-- Views does not work
-- selecting from views works
CREATE VIEW test_view AS SELECT * FROM raw_events_first;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(16, now(), 60, 600, 6000.1, 60000);
SELECT count(*) FROM raw_events_second;
INSERT INTO raw_events_second SELECT * FROM test_view;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(17, now(), 60, 600, 6000.1, 60000);
INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6;
SELECT count(*) FROM raw_events_second;
-- inserting into views does not
INSERT INTO test_view SELECT * FROM raw_events_second;
-- we need this in our next test
truncate raw_events_first;

View File

@ -0,0 +1,139 @@
--
-- MULTI_VIEW
--
-- This file contains test cases for view support. It verifies various
-- Citus features: simple selects, aggregates, joins, outer joins
-- router queries, single row inserts, multi row inserts via insert
-- into select, multi row insert via copy commands.
SELECT count(*) FROM lineitem_hash_part;
SELECT count(*) FROM orders_hash_part;
-- create a view for priority orders
CREATE VIEW priority_orders AS SELECT * FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM';
-- aggregate pushdown
SELECT o_orderpriority, count(*) FROM priority_orders GROUP BY 1 ORDER BY 2, 1;
SELECT o_orderpriority, count(*) FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM' GROUP BY 1 ORDER BY 2,1;
-- filters
SELECT o_orderpriority, count(*) as all, count(*) FILTER (WHERE o_orderstatus ='F') as fullfilled FROM priority_orders GROUP BY 1 ORDER BY 2, 1;
-- having
SELECT o_orderdate, count(*) from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc;
-- having with filters
SELECT o_orderdate, count(*) as all, count(*) FILTER(WHERE o_orderstatus = 'F') from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc;
-- limit
SELECT o_orderkey, o_totalprice from orders_hash_part order by 2 desc, 1 asc limit 5 ;
SELECT o_orderkey, o_totalprice from priority_orders order by 2 desc, 1 asc limit 1 ;
CREATE VIEW priority_lineitem AS SELECT li.* FROM lineitem_hash_part li JOIN priority_orders ON (l_orderkey = o_orderkey);
SELECT l_orderkey, count(*) FROM priority_lineitem GROUP BY 1 ORDER BY 2 DESC, 1 LIMIT 5;
CREATE VIEW air_shipped_lineitems AS SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR';
-- join between view and table
SELECT count(*) FROM orders_hash_part join air_shipped_lineitems ON (o_orderkey = l_orderkey);
-- join between views
SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
-- count distinct on partition column is not supported
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
-- count distinct on partition column is supported on router queries
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems
ON (o_orderkey = l_orderkey)
WHERE (o_orderkey = 231);
-- select distinct on router joins of views also works
SELECT distinct(o_orderkey) FROM priority_orders join air_shipped_lineitems
ON (o_orderkey = l_orderkey)
WHERE (o_orderkey = 231);
-- left join support depends on flattening of the query
-- following query fails since the inner part is kept as subquery
SELECT * FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey);
-- however, this works
SELECT count(*) FROM priority_orders left join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
-- view at the inner side of is not supported
SELECT count(*) FROM priority_orders right join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
-- but view at the outer side is. This is essentially the same as a left join with arguments reversed.
SELECT count(*) FROM lineitem_hash_part right join priority_orders ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
-- left join on router query is supported
SELECT o_orderkey, l_linenumber FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey)
WHERE o_orderkey = 2;
-- repartition query on view join
-- it passes planning, fails at execution stage
SELECT * FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey);
SET citus.task_executor_type to "task-tracker";
SELECT count(*) FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey);
SET citus.task_executor_type to DEFAULT;
-- insert into... select works with views
CREATE TABLE temp_lineitem(LIKE lineitem_hash_part);
SELECT create_distributed_table('temp_lineitem', 'l_orderkey', 'hash', 'lineitem_hash_part');
INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems;
SELECT count(*) FROM temp_lineitem;
-- following is a where false query, should not be inserting anything
INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems WHERE l_shipmode = 'MAIL';
SELECT count(*) FROM temp_lineitem;
-- modifying views is disallowed
INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem;
SET citus.task_executor_type to "task-tracker";
-- single view repartition subqueries are not supported
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM air_shipped_lineitems GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
-- logically same query without a view works fine
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM lineitem_hash_part WHERE l_shipmode = 'AIR' GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
-- when a view is replaced by actual query it still fails
SELECT l_suppkey, count(*) FROM
(SELECT l_suppkey, l_shipdate, count(*)
FROM (SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR') asi
GROUP BY l_suppkey, l_shipdate) supps
GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5;
SET citus.task_executor_type to DEFAULT;
-- create a view with aggregate
CREATE VIEW lineitems_by_shipping_method AS
SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1;
-- following will fail due to non-flattening of subquery due to GROUP BY
SELECT * FROM lineitems_by_shipping_method;
-- create a view with group by on partition column
CREATE VIEW lineitems_by_orderkey AS
SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1;
-- this will also fail due to same reason
SELECT * FROM lineitems_by_orderkey;
-- however it would work if it is made router plannable
SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100;
DROP TABLE temp_lineitem CASCADE;