diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 6b31caac1..c5028ba9c 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -3280,7 +3280,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); Expr *targetExpression = targetEntry->expr; - bool isPartitionColumn = IsPartitionColumnRecursive(targetExpression, query); + bool isPartitionColumn = IsPartitionColumn(targetExpression, query); if (isPartitionColumn) { FieldSelect *compositeField = CompositeFieldRecursive(targetExpression, @@ -3312,24 +3312,52 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) /* - * IsPartitionColumnRecursive recursively checks if the given column is a partition - * column. If a column is referenced from a regular table, we directly check if - * it is a partition column. If a column is referenced from a subquery, then we - * recursively check that subquery until we reach the source of that column, and - * verify this column is a partition column. If a column is referenced from a - * join range table entry, then we resolve which join column it refers and - * recursively check this column with the same query. + * IsPartitionColumn returns true if the given column is a partition column. + * The function uses FindReferencedTableColumn to find the original relation + * id and column that the column expression refers to. It then checks whether + * that column is a partition column of the relation. * - * Note that if the given expression is a field of a composite type, then this - * function checks if this composite column is a partition column. - * - * Also, the function returns always false for reference tables given that reference - * tables do not have partition column. + * Also, the function returns always false for reference tables given that + * reference tables do not have partition column. The function does not + * support queries with CTEs, it would return false if columnExpression + * refers to a column returned by a CTE. */ bool -IsPartitionColumnRecursive(Expr *columnExpression, Query *query) +IsPartitionColumn(Expr *columnExpression, Query *query) { bool isPartitionColumn = false; + Oid relationId = InvalidOid; + Var *column = NULL; + + FindReferencedTableColumn(columnExpression, NIL, query, &relationId, &column); + + if (relationId != InvalidOid && column != NULL) + { + Var *partitionColumn = PartitionKey(relationId); + + /* not all distributed tables have partition column */ + if (partitionColumn != NULL && column->varattno == partitionColumn->varattno) + { + isPartitionColumn = true; + } + } + + return isPartitionColumn; +} + + +/* + * FindReferencedTableColumn recursively traverses query tree to find actual relation + * id, and column that columnExpression refers to. If columnExpression is a + * non-relational or computed/derived expression, the function returns InvolidOid for + * relationId and NULL for column. The caller should provide parent query list from + * top of the tree to this particular Query's parent. This argument is used to look + * into CTEs that may be present in the query. + */ +void +FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *query, + Oid *relationId, Var **column) +{ Var *candidateColumn = NULL; List *rangetableList = query->rtable; Index rangeTableEntryIndex = 0; @@ -3337,6 +3365,9 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query) Expr *strippedColumnExpression = (Expr *) strip_implicit_coercions( (Node *) columnExpression); + *relationId = InvalidOid; + *column = NULL; + if (IsA(strippedColumnExpression, Var)) { candidateColumn = (Var *) strippedColumnExpression; @@ -3350,17 +3381,11 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query) { candidateColumn = (Var *) fieldExpression; } - else - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Only references to column fields are supported"))); - } } if (candidateColumn == NULL) { - return false; + return; } rangeTableEntryIndex = candidateColumn->varno - 1; @@ -3368,18 +3393,8 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query) if (rangeTableEntry->rtekind == RTE_RELATION) { - Oid relationId = rangeTableEntry->relid; - Var *partitionColumn = PartitionKey(relationId); - - /* reference tables do not have partition column */ - if (partitionColumn == NULL) - { - isPartitionColumn = false; - } - else if (candidateColumn->varattno == partitionColumn->varattno) - { - isPartitionColumn = true; - } + *relationId = rangeTableEntry->relid; + *column = candidateColumn; } else if (rangeTableEntry->rtekind == RTE_SUBQUERY) { @@ -3387,9 +3402,12 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query) List *targetEntryList = subquery->targetList; AttrNumber targetEntryIndex = candidateColumn->varattno - 1; TargetEntry *subqueryTargetEntry = list_nth(targetEntryList, targetEntryIndex); + Expr *subColumnExpression = subqueryTargetEntry->expr; - Expr *subqueryExpression = subqueryTargetEntry->expr; - isPartitionColumn = IsPartitionColumnRecursive(subqueryExpression, subquery); + /* append current query to parent query list */ + parentQueryList = lappend(parentQueryList, query); + FindReferencedTableColumn(subColumnExpression, parentQueryList, + subquery, relationId, column); } else if (rangeTableEntry->rtekind == RTE_JOIN) { @@ -3397,10 +3415,52 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query) AttrNumber joinColumnIndex = candidateColumn->varattno - 1; Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex); - isPartitionColumn = IsPartitionColumnRecursive(joinColumn, query); + /* parent query list stays the same since still in the same query boundary */ + FindReferencedTableColumn(joinColumn, parentQueryList, query, + relationId, column); } + else if (rangeTableEntry->rtekind == RTE_CTE) + { + int cteParentListIndex = list_length(parentQueryList) - + rangeTableEntry->ctelevelsup - 1; + Query *cteParentQuery = NULL; + List *cteList = NIL; + ListCell *cteListCell = NULL; + CommonTableExpr *cte = NULL; - return isPartitionColumn; + /* + * This should have been an error case, not marking it as error at the + * moment due to usage from IsPartitionColumn. Callers of that function + * do not have access to parent query list. + */ + if (cteParentListIndex >= 0) + { + cteParentQuery = list_nth(parentQueryList, cteParentListIndex); + cteList = cteParentQuery->cteList; + } + + foreach(cteListCell, cteList) + { + CommonTableExpr *candidateCte = (CommonTableExpr *) lfirst(cteListCell); + if (strcmp(candidateCte->ctename, rangeTableEntry->ctename) == 0) + { + cte = candidateCte; + break; + } + } + + if (cte != NULL) + { + Query *cteQuery = (Query *) cte->ctequery; + List *targetEntryList = cteQuery->targetList; + AttrNumber targetEntryIndex = candidateColumn->varattno - 1; + TargetEntry *targetEntry = list_nth(targetEntryList, targetEntryIndex); + + parentQueryList = lappend(parentQueryList, query); + FindReferencedTableColumn(targetEntry->expr, parentQueryList, + cteQuery, relationId, column); + } + } } @@ -3669,10 +3729,10 @@ SupportedLateralQuery(Query *parentQuery, Query *lateralQuery) continue; } - outerColumnIsPartitionColumn = IsPartitionColumnRecursive(outerQueryExpression, - parentQuery); - localColumnIsPartitionColumn = IsPartitionColumnRecursive(localQueryExpression, - lateralQuery); + outerColumnIsPartitionColumn = IsPartitionColumn(outerQueryExpression, + parentQuery); + localColumnIsPartitionColumn = IsPartitionColumn(localQueryExpression, + lateralQuery); if (outerColumnIsPartitionColumn && localColumnIsPartitionColumn) { @@ -3747,8 +3807,8 @@ JoinOnPartitionColumn(Query *query) leftArgument = (Expr *) linitial(joinArgumentList); rightArgument = (Expr *) lsecond(joinArgumentList); - isLeftColumnPartitionColumn = IsPartitionColumnRecursive(leftArgument, query); - isRightColumnPartitionColumn = IsPartitionColumnRecursive(rightArgument, query); + isLeftColumnPartitionColumn = IsPartitionColumn(leftArgument, query); + isRightColumnPartitionColumn = IsPartitionColumn(rightArgument, query); if (isLeftColumnPartitionColumn && isRightColumnPartitionColumn) { diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 9ec88a873..1d2e071cd 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -16,6 +16,7 @@ #include "access/nbtree.h" #include "catalog/pg_am.h" +#include "catalog/pg_class.h" #include "commands/defrem.h" #include "distributed/colocation_utils.h" #include "distributed/metadata_cache.h" @@ -301,7 +302,7 @@ MultiPlanTree(Query *queryTree) * elements. */ joinClauseList = JoinClauseList(whereClauseList); - tableEntryList = TableEntryList(rangeTableList); + tableEntryList = UsedTableEntryList(queryTree); /* build the list of multi table nodes */ tableNodeList = MultiTableNodeList(tableEntryList, rangeTableList); @@ -1095,6 +1096,38 @@ TableEntryList(List *rangeTableList) } +/* + * UsedTableEntryList returns list of relation range table entries + * that are referenced within the query. Unused entries due to query + * flattening or re-rewriting are ignored. + */ +List * +UsedTableEntryList(Query *query) +{ + List *tableEntryList = NIL; + List *rangeTableList = query->rtable; + List *joinTreeTableIndexList = NIL; + ListCell *joinTreeTableIndexCell = NULL; + + ExtractRangeTableIndexWalker((Node *) query->jointree, &joinTreeTableIndexList); + foreach(joinTreeTableIndexCell, joinTreeTableIndexList) + { + int joinTreeTableIndex = lfirst_int(joinTreeTableIndexCell); + RangeTblEntry *rangeTableEntry = rt_fetch(joinTreeTableIndex, rangeTableList); + if (rangeTableEntry->rtekind == RTE_RELATION) + { + TableEntry *tableEntry = (TableEntry *) palloc0(sizeof(TableEntry)); + tableEntry->relationId = rangeTableEntry->relid; + tableEntry->rangeTableId = joinTreeTableIndex; + + tableEntryList = lappend(tableEntryList, tableEntry); + } + } + + return tableEntryList; +} + + /* * MultiTableNodeList builds a list of MultiTable nodes from the given table * entry list. A multi table node represents one entry from the range table @@ -1592,7 +1625,8 @@ ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList) foreach(rangeTableCell, rangeTableList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - if (rangeTableEntry->rtekind == RTE_RELATION) + if (rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind != RELKIND_VIEW) { (*rangeTableRelationList) = lappend(*rangeTableRelationList, rangeTableEntry); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index c04534354..311cfbd67 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -496,7 +496,7 @@ AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval { TargetEntry *targetEntry = lfirst(targetEntryCell); - if (IsPartitionColumnRecursive(targetEntry->expr, subqery) && + if (IsPartitionColumn(targetEntry->expr, subqery) && IsA(targetEntry->expr, Var)) { targetPartitionColumnVar = (Var *) targetEntry->expr; @@ -641,12 +641,25 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte, Oid selectPartitionColumnTableId = InvalidOid; Oid targetRelationId = insertRte->relid; char targetPartitionMethod = PartitionMethod(targetRelationId); + ListCell *rangeTableCell = NULL; /* we only do this check for INSERT ... SELECT queries */ AssertArg(InsertSelectQuery(queryTree)); EnsureSchemaNode(); + /* we do not expect to see a view in modify target */ + foreach(rangeTableCell, queryTree->rtable) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + if (rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind == RELKIND_VIEW) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot insert into view over distributed table"))); + } + } + subquery = subqueryRte->subquery; if (contain_volatile_functions((Node *) queryTree)) @@ -654,9 +667,8 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte, ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning for the given " "modification"), - errdetail( - "Volatile functions are not allowed in INSERT ... " - "SELECT queries"))); + errdetail("Volatile functions are not allowed in " + "INSERT ... SELECT queries"))); } /* we don't support LIMIT, OFFSET and WINDOW functions */ @@ -821,6 +833,9 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse AttrNumber originalAttrNo = get_attnum(insertRelationId, targetEntry->resname); TargetEntry *subqeryTargetEntry = NULL; + Oid originalRelationId = InvalidOid; + Var *originalColumn = NULL; + List *parentQueryList = NIL; if (originalAttrNo != insertPartitionColumn->varattno) { @@ -836,25 +851,36 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse break; } - /* - * Reference tables doesn't have a partition column, thus partition columns - * cannot match at all. - */ - if (PartitionMethod(subqeryTargetEntry->resorigtbl) == DISTRIBUTE_BY_NONE) + parentQueryList = list_make2(query, subquery); + FindReferencedTableColumn(subqeryTargetEntry->expr, + parentQueryList, subquery, + &originalRelationId, + &originalColumn); + + if (originalRelationId == InvalidOid) { partitionColumnsMatch = false; break; } - if (!IsPartitionColumnRecursive(subqeryTargetEntry->expr, subquery)) + /* + * Reference tables doesn't have a partition column, thus partition columns + * cannot match at all. + */ + if (PartitionMethod(originalRelationId) == DISTRIBUTE_BY_NONE) + { + partitionColumnsMatch = false; + break; + } + + if (!IsPartitionColumn(subqeryTargetEntry->expr, subquery)) { partitionColumnsMatch = false; break; } partitionColumnsMatch = true; - *selectPartitionColumnTableId = subqeryTargetEntry->resorigtbl; - + *selectPartitionColumnTableId = originalRelationId; break; } } @@ -916,7 +942,7 @@ AddUninstantiatedPartitionRestriction(Query *originalQuery) { TargetEntry *targetEntry = lfirst(targetEntryCell); - if (IsPartitionColumnRecursive(targetEntry->expr, subquery) && + if (IsPartitionColumn(targetEntry->expr, subquery) && IsA(targetEntry->expr, Var)) { targetPartitionColumnVar = (Var *) targetEntry->expr; @@ -1077,6 +1103,13 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) } queryTableCount++; + + /* we do not expect to see a view in modify query */ + if (rangeTableEntry->relkind == RELKIND_VIEW) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot modify views over distributed tables"))); + } } else if (rangeTableEntry->rtekind == RTE_VALUES) { diff --git a/src/backend/distributed/utils/ruleutils_95.c b/src/backend/distributed/utils/ruleutils_95.c index 2b710aa93..f8a60fe7d 100644 --- a/src/backend/distributed/utils/ruleutils_95.c +++ b/src/backend/distributed/utils/ruleutils_95.c @@ -6664,6 +6664,11 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context) if (strcmp(refname, rte->ctename) != 0) printalias = true; } + else if (rte->rtekind == RTE_SUBQUERY) + { + /* subquery requires alias too */ + printalias = true; + } if (printalias) appendStringInfo(buf, " %s", quote_identifier(refname)); diff --git a/src/backend/distributed/utils/ruleutils_96.c b/src/backend/distributed/utils/ruleutils_96.c index fd8e71ce2..3e4a38ddf 100644 --- a/src/backend/distributed/utils/ruleutils_96.c +++ b/src/backend/distributed/utils/ruleutils_96.c @@ -6739,6 +6739,11 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context) if (strcmp(refname, rte->ctename) != 0) printalias = true; } + else if (rte->rtekind == RTE_SUBQUERY) + { + /* subquery requires alias too */ + printalias = true; + } if (printalias) appendStringInfo(buf, " %s", quote_identifier(refname)); diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 810cc31bd..31fd56408 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -122,7 +122,9 @@ extern bool ExtractQueryWalker(Node *node, List **queryList); extern bool LeafQuery(Query *queryTree); extern List * PartitionColumnOpExpressionList(Query *query); extern List * ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn); -extern bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query); +extern bool IsPartitionColumn(Expr *columnExpression, Query *query); +extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, + Query *query, Oid *relationId, Var **column); #endif /* MULTI_LOGICAL_OPTIMIZER_H */ diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 6487c7a28..48dba38ca 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -199,6 +199,7 @@ extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList) extern List * WhereClauseList(FromExpr *fromExpr); extern List * QualifierList(FromExpr *fromExpr); extern List * TableEntryList(List *rangeTableList); +extern List * UsedTableEntryList(Query *query); extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList); extern List * pull_var_clause_default(Node *node); diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 27caeeb87..4570273d0 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -187,3 +187,18 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl DROP TABLE mx_table_test; SET citus.shard_replication_factor TO default; +SET citus.shard_count to 4; +CREATE TABLE lineitem_hash_part (like lineitem); +SELECT create_distributed_table('lineitem_hash_part', 'l_orderkey'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE orders_hash_part (like orders); +SELECT create_distributed_table('orders_hash_part', 'o_orderkey'); + create_distributed_table +-------------------------- + +(1 row) + diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index c08a0c526..2743fd9ec 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1479,10 +1479,29 @@ COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications CONTEXT: COPY raw_events_first, line 1: "103,103" ROLLBACK; --- Views does not work +-- selecting from views works CREATE VIEW test_view AS SELECT * FROM raw_events_first; +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (16, now(), 60, 600, 6000.1, 60000); +SELECT count(*) FROM raw_events_second; + count +------- + 11 +(1 row) + INSERT INTO raw_events_second SELECT * FROM test_view; -ERROR: cannot plan queries that include both regular and partitioned relations +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (17, now(), 60, 600, 6000.1, 60000); +INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6; +SELECT count(*) FROM raw_events_second; + count +------- + 13 +(1 row) + +-- inserting into views does not +INSERT INTO test_view SELECT * FROM raw_events_second; +ERROR: cannot insert into view over distributed table -- we need this in our next test truncate raw_events_first; SET client_min_messages TO DEBUG4; diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out new file mode 100644 index 000000000..b8e02b49f --- /dev/null +++ b/src/test/regress/expected/multi_view.out @@ -0,0 +1,253 @@ +-- +-- MULTI_VIEW +-- +-- This file contains test cases for view support. It verifies various +-- Citus features: simple selects, aggregates, joins, outer joins +-- router queries, single row inserts, multi row inserts via insert +-- into select, multi row insert via copy commands. +SELECT count(*) FROM lineitem_hash_part; + count +------- + 12000 +(1 row) + +SELECT count(*) FROM orders_hash_part; + count +------- + 2984 +(1 row) + +-- create a view for priority orders +CREATE VIEW priority_orders AS SELECT * FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM'; +-- aggregate pushdown +SELECT o_orderpriority, count(*) FROM priority_orders GROUP BY 1 ORDER BY 2, 1; + o_orderpriority | count +-----------------+------- + 2-HIGH | 593 + 1-URGENT | 603 +(2 rows) + +SELECT o_orderpriority, count(*) FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM' GROUP BY 1 ORDER BY 2,1; + o_orderpriority | count +-----------------+------- + 2-HIGH | 593 + 1-URGENT | 603 +(2 rows) + +-- filters +SELECT o_orderpriority, count(*) as all, count(*) FILTER (WHERE o_orderstatus ='F') as fullfilled FROM priority_orders GROUP BY 1 ORDER BY 2, 1; + o_orderpriority | all | fullfilled +-----------------+-----+------------ + 2-HIGH | 593 | 271 + 1-URGENT | 603 | 280 +(2 rows) + +-- having +SELECT o_orderdate, count(*) from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc; + o_orderdate | count +-------------+------- + 08-20-1996 | 5 + 10-10-1994 | 4 + 05-05-1994 | 4 + 04-07-1994 | 4 + 03-17-1993 | 4 +(5 rows) + +-- having with filters +SELECT o_orderdate, count(*) as all, count(*) FILTER(WHERE o_orderstatus = 'F') from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc; + o_orderdate | all | count +-------------+-----+------- + 08-20-1996 | 5 | 0 + 10-10-1994 | 4 | 4 + 05-05-1994 | 4 | 4 + 04-07-1994 | 4 | 4 + 03-17-1993 | 4 | 4 +(5 rows) + +-- limit +SELECT o_orderkey, o_totalprice from orders_hash_part order by 2 desc, 1 asc limit 5 ; + o_orderkey | o_totalprice +------------+-------------- + 4421 | 401055.62 + 10209 | 400191.77 + 11142 | 395039.05 + 14179 | 384265.43 + 11296 | 378166.33 +(5 rows) + +SELECT o_orderkey, o_totalprice from priority_orders order by 2 desc, 1 asc limit 1 ; + o_orderkey | o_totalprice +------------+-------------- + 14179 | 384265.43 +(1 row) + +CREATE VIEW priority_lineitem AS SELECT li.* FROM lineitem_hash_part li JOIN priority_orders ON (l_orderkey = o_orderkey); +SELECT l_orderkey, count(*) FROM priority_lineitem GROUP BY 1 ORDER BY 2 DESC, 1 LIMIT 5; + l_orderkey | count +------------+------- + 7 | 7 + 225 | 7 + 226 | 7 + 322 | 7 + 326 | 7 +(5 rows) + +CREATE VIEW air_shipped_lineitems AS SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR'; +-- join between view and table +SELECT count(*) FROM orders_hash_part join air_shipped_lineitems ON (o_orderkey = l_orderkey); + count +------- + 1706 +(1 row) + +-- join between views +SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); + count +------- + 700 +(1 row) + +-- count distinct on partition column is not supported +SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); +ERROR: cannot compute aggregate (distinct) +DETAIL: table partitioning is unsuitable for aggregate (distinct) +HINT: You can load the hll extension from contrib packages and enable distinct approximations. +-- count distinct on partition column is supported on router queries +SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems + ON (o_orderkey = l_orderkey) + WHERE (o_orderkey = 231); + count +------- + 1 +(1 row) + +-- select distinct on router joins of views also works +SELECT distinct(o_orderkey) FROM priority_orders join air_shipped_lineitems + ON (o_orderkey = l_orderkey) + WHERE (o_orderkey = 231); + o_orderkey +------------ + 231 +(1 row) + +-- left join support depends on flattening of the query +-- following query fails since the inner part is kept as subquery +SELECT * FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey); +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries in outer joins are not supported +-- however, this works +SELECT count(*) FROM priority_orders left join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR'; + count +------- + 700 +(1 row) + +-- view at the inner side of is not supported +SELECT count(*) FROM priority_orders right join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR'; +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries in outer joins are not supported +-- but view at the outer side is. This is essentially the same as a left join with arguments reversed. +SELECT count(*) FROM lineitem_hash_part right join priority_orders ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR'; + count +------- + 700 +(1 row) + +-- left join on router query is supported +SELECT o_orderkey, l_linenumber FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey) + WHERE o_orderkey = 2; + o_orderkey | l_linenumber +------------+-------------- + 2 | +(1 row) + +-- repartition query on view join +-- it passes planning, fails at execution stage +SELECT * FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey); +ERROR: cannot use real time executor with repartition jobs +HINT: Set citus.task_executor_type to "task-tracker". +SET citus.task_executor_type to "task-tracker"; +SELECT count(*) FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey); + count +------- + 192 +(1 row) + +SET citus.task_executor_type to DEFAULT; +-- insert into... select works with views +CREATE TABLE temp_lineitem(LIKE lineitem_hash_part); +SELECT create_distributed_table('temp_lineitem', 'l_orderkey', 'hash', 'lineitem_hash_part'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems; +SELECT count(*) FROM temp_lineitem; + count +------- + 1706 +(1 row) + +-- following is a where false query, should not be inserting anything +INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems WHERE l_shipmode = 'MAIL'; +SELECT count(*) FROM temp_lineitem; + count +------- + 1706 +(1 row) + +-- modifying views is disallowed +INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem; +ERROR: cannot insert into view over distributed table +SET citus.task_executor_type to "task-tracker"; +-- single view repartition subqueries are not supported +SELECT l_suppkey, count(*) FROM + (SELECT l_suppkey, l_shipdate, count(*) + FROM air_shipped_lineitems GROUP BY l_suppkey, l_shipdate) supps + GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5; +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries without group by clause are not supported yet +-- logically same query without a view works fine +SELECT l_suppkey, count(*) FROM + (SELECT l_suppkey, l_shipdate, count(*) + FROM lineitem_hash_part WHERE l_shipmode = 'AIR' GROUP BY l_suppkey, l_shipdate) supps + GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5; + l_suppkey | count +-----------+------- + 7680 | 4 + 160 | 3 + 1042 | 3 + 1318 | 3 + 5873 | 3 +(5 rows) + +-- when a view is replaced by actual query it still fails +SELECT l_suppkey, count(*) FROM + (SELECT l_suppkey, l_shipdate, count(*) + FROM (SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR') asi + GROUP BY l_suppkey, l_shipdate) supps + GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5; +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries without group by clause are not supported yet +SET citus.task_executor_type to DEFAULT; +-- create a view with aggregate +CREATE VIEW lineitems_by_shipping_method AS + SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1; +-- following will fail due to non-flattening of subquery due to GROUP BY +SELECT * FROM lineitems_by_shipping_method; +ERROR: bogus varno: 0 +-- create a view with group by on partition column +CREATE VIEW lineitems_by_orderkey AS + SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1; +-- this will also fail due to same reason +SELECT * FROM lineitems_by_orderkey; +ERROR: bogus varno: 0 +-- however it would work if it is made router plannable +SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100; + l_orderkey | count +------------+------- + 100 | 5 +(1 row) + +DROP TABLE temp_lineitem CASCADE; diff --git a/src/test/regress/input/multi_load_data.source b/src/test/regress/input/multi_load_data.source index d7b00c874..6fd19d346 100644 --- a/src/test/regress/input/multi_load_data.source +++ b/src/test/regress/input/multi_load_data.source @@ -1,5 +1,5 @@ -- --- MULTI_STAGE_DATA +-- MULTI_LOAD_DATA -- -- Tests for loading data in a distributed cluster. Please note that the number -- of shards uploaded depends on two config values: citus.shard_replication_factor and @@ -20,3 +20,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 290000; \copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|' \copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|' \copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|' +\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|' +\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|' +\copy orders_hash_part FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +\copy orders_hash_part FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 2043cdae8..640488d5a 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -56,6 +56,7 @@ test: multi_dropped_column_aliases test: multi_binary_master_copy_format test: multi_prepare_sql multi_prepare_plsql test: multi_sql_function +test: multi_view # ---------- # Parallel TPC-H tests to check our distributed execution behavior diff --git a/src/test/regress/output/multi_load_data.source b/src/test/regress/output/multi_load_data.source index 94e07980d..3d694090a 100644 --- a/src/test/regress/output/multi_load_data.source +++ b/src/test/regress/output/multi_load_data.source @@ -1,5 +1,5 @@ -- --- MULTI_STAGE_DATA +-- MULTI_LOAD_DATA -- -- Tests for loading data in a distributed cluster. Please note that the number -- of shards uploaded depends on two config values: citus.shard_replication_factor and @@ -16,3 +16,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 290000; \copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|' \copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|' \copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|' +\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|' +\copy lineitem_hash_part FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|' +\copy orders_hash_part FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +\copy orders_hash_part FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 1f0dfef22..0414653fc 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -136,3 +136,11 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='mx_table_test'::regcl DROP TABLE mx_table_test; SET citus.shard_replication_factor TO default; + +SET citus.shard_count to 4; + +CREATE TABLE lineitem_hash_part (like lineitem); +SELECT create_distributed_table('lineitem_hash_part', 'l_orderkey'); + +CREATE TABLE orders_hash_part (like orders); +SELECT create_distributed_table('orders_hash_part', 'o_orderkey'); diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index db1e0b7bf..85cd3c5f5 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -744,9 +744,19 @@ COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; \. ROLLBACK; --- Views does not work +-- selecting from views works CREATE VIEW test_view AS SELECT * FROM raw_events_first; +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (16, now(), 60, 600, 6000.1, 60000); +SELECT count(*) FROM raw_events_second; INSERT INTO raw_events_second SELECT * FROM test_view; +INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES + (17, now(), 60, 600, 6000.1, 60000); +INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6; +SELECT count(*) FROM raw_events_second; + +-- inserting into views does not +INSERT INTO test_view SELECT * FROM raw_events_second; -- we need this in our next test truncate raw_events_first; diff --git a/src/test/regress/sql/multi_view.sql b/src/test/regress/sql/multi_view.sql new file mode 100644 index 000000000..164e5a48f --- /dev/null +++ b/src/test/regress/sql/multi_view.sql @@ -0,0 +1,139 @@ +-- +-- MULTI_VIEW +-- + +-- This file contains test cases for view support. It verifies various +-- Citus features: simple selects, aggregates, joins, outer joins +-- router queries, single row inserts, multi row inserts via insert +-- into select, multi row insert via copy commands. + +SELECT count(*) FROM lineitem_hash_part; + +SELECT count(*) FROM orders_hash_part; + +-- create a view for priority orders +CREATE VIEW priority_orders AS SELECT * FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM'; + +-- aggregate pushdown +SELECT o_orderpriority, count(*) FROM priority_orders GROUP BY 1 ORDER BY 2, 1; + +SELECT o_orderpriority, count(*) FROM orders_hash_part WHERE o_orderpriority < '3-MEDIUM' GROUP BY 1 ORDER BY 2,1; + +-- filters +SELECT o_orderpriority, count(*) as all, count(*) FILTER (WHERE o_orderstatus ='F') as fullfilled FROM priority_orders GROUP BY 1 ORDER BY 2, 1; + +-- having +SELECT o_orderdate, count(*) from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc; + +-- having with filters +SELECT o_orderdate, count(*) as all, count(*) FILTER(WHERE o_orderstatus = 'F') from priority_orders group by 1 having (count(*) > 3) order by 2 desc, 1 desc; + +-- limit +SELECT o_orderkey, o_totalprice from orders_hash_part order by 2 desc, 1 asc limit 5 ; + +SELECT o_orderkey, o_totalprice from priority_orders order by 2 desc, 1 asc limit 1 ; + +CREATE VIEW priority_lineitem AS SELECT li.* FROM lineitem_hash_part li JOIN priority_orders ON (l_orderkey = o_orderkey); + +SELECT l_orderkey, count(*) FROM priority_lineitem GROUP BY 1 ORDER BY 2 DESC, 1 LIMIT 5; + +CREATE VIEW air_shipped_lineitems AS SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR'; + +-- join between view and table +SELECT count(*) FROM orders_hash_part join air_shipped_lineitems ON (o_orderkey = l_orderkey); + +-- join between views +SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); + +-- count distinct on partition column is not supported +SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); + +-- count distinct on partition column is supported on router queries +SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems + ON (o_orderkey = l_orderkey) + WHERE (o_orderkey = 231); + +-- select distinct on router joins of views also works +SELECT distinct(o_orderkey) FROM priority_orders join air_shipped_lineitems + ON (o_orderkey = l_orderkey) + WHERE (o_orderkey = 231); + +-- left join support depends on flattening of the query +-- following query fails since the inner part is kept as subquery +SELECT * FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey); + +-- however, this works +SELECT count(*) FROM priority_orders left join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR'; + +-- view at the inner side of is not supported +SELECT count(*) FROM priority_orders right join lineitem_hash_part ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR'; + +-- but view at the outer side is. This is essentially the same as a left join with arguments reversed. +SELECT count(*) FROM lineitem_hash_part right join priority_orders ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR'; + + +-- left join on router query is supported +SELECT o_orderkey, l_linenumber FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey) + WHERE o_orderkey = 2; + +-- repartition query on view join +-- it passes planning, fails at execution stage +SELECT * FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey); + +SET citus.task_executor_type to "task-tracker"; +SELECT count(*) FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey); +SET citus.task_executor_type to DEFAULT; + +-- insert into... select works with views +CREATE TABLE temp_lineitem(LIKE lineitem_hash_part); +SELECT create_distributed_table('temp_lineitem', 'l_orderkey', 'hash', 'lineitem_hash_part'); +INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems; +SELECT count(*) FROM temp_lineitem; +-- following is a where false query, should not be inserting anything +INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems WHERE l_shipmode = 'MAIL'; +SELECT count(*) FROM temp_lineitem; + +-- modifying views is disallowed +INSERT INTO air_shipped_lineitems SELECT * from temp_lineitem; + +SET citus.task_executor_type to "task-tracker"; + +-- single view repartition subqueries are not supported +SELECT l_suppkey, count(*) FROM + (SELECT l_suppkey, l_shipdate, count(*) + FROM air_shipped_lineitems GROUP BY l_suppkey, l_shipdate) supps + GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5; + +-- logically same query without a view works fine +SELECT l_suppkey, count(*) FROM + (SELECT l_suppkey, l_shipdate, count(*) + FROM lineitem_hash_part WHERE l_shipmode = 'AIR' GROUP BY l_suppkey, l_shipdate) supps + GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5; + +-- when a view is replaced by actual query it still fails +SELECT l_suppkey, count(*) FROM + (SELECT l_suppkey, l_shipdate, count(*) + FROM (SELECT * FROM lineitem_hash_part WHERE l_shipmode = 'AIR') asi + GROUP BY l_suppkey, l_shipdate) supps + GROUP BY l_suppkey ORDER BY 2 DESC, 1 LIMIT 5; + +SET citus.task_executor_type to DEFAULT; + +-- create a view with aggregate +CREATE VIEW lineitems_by_shipping_method AS + SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1; + +-- following will fail due to non-flattening of subquery due to GROUP BY +SELECT * FROM lineitems_by_shipping_method; + +-- create a view with group by on partition column +CREATE VIEW lineitems_by_orderkey AS + SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1; + +-- this will also fail due to same reason +SELECT * FROM lineitems_by_orderkey; + +-- however it would work if it is made router plannable +SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100; + +DROP TABLE temp_lineitem CASCADE;