diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index e8a294528..b5e694918 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -12,8 +12,8 @@ #include #include +#include "catalog/pg_class.h" #include "catalog/pg_type.h" - #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" #include "distributed/insert_select_planner.h" @@ -41,6 +41,7 @@ int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log /* local function forward declarations */ +static bool NeedsDistributedPlanningWalker(Node *node, void *context); static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query, ParamListInfo boundParams, PlannerRestrictionContext * @@ -75,20 +76,34 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) PlannerRestrictionContext *plannerRestrictionContext = NULL; bool setPartitionedTablesInherited = false; - /* - * standard_planner scribbles on it's input, but for deparsing we need the - * unmodified form. Note that we keep RTE_RELATIONs with their identities - * set, which doesn't break our goals, but, prevents us keeping an extra copy - * of the query tree. Note that we copy the query tree once we're sure it's a - * distributed query. - */ if (needsDistributedPlanning) { - setPartitionedTablesInherited = false; + /* + * Inserting into a local table needs to go through the regular postgres + * planner/executor, but the SELECT needs to go through Citus. We currently + * don't have a way of doing both things and therefore error out, but do + * have a handy tip for users. + */ + if (InsertSelectIntoLocalTable(parse)) + { + ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a " + "local table"), + errhint("Consider using CREATE TEMPORARY TABLE tmp AS " + "SELECT ... and inserting from the temporary " + "table."))); + } + /* + * standard_planner scribbles on it's input, but for deparsing we need the + * unmodified form. Note that we keep RTE_RELATIONs with their identities + * set, which doesn't break our goals, but, prevents us keeping an extra copy + * of the query tree. Note that we copy the query tree once we're sure it's a + * distributed query. + */ AssignRTEIdentities(parse); originalQuery = copyObject(parse); + setPartitionedTablesInherited = false; AdjustPartitioningForDistributedPlanning(parse, setPartitionedTablesInherited); } @@ -147,6 +162,75 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } +/* + * NeedsDistributedPlanning returns true if the Citus extension is loaded and + * the query contains a distributed table. + * + * This function allows queries containing local tables to pass through the + * distributed planner. How to handle local tables is a decision that should + * be made within the planner + */ +bool +NeedsDistributedPlanning(Query *query) +{ + CmdType commandType = query->commandType; + if (commandType != CMD_SELECT && commandType != CMD_INSERT && + commandType != CMD_UPDATE && commandType != CMD_DELETE) + { + return false; + } + + if (!CitusHasBeenLoaded()) + { + return false; + } + + if (!NeedsDistributedPlanningWalker((Node *) query, NULL)) + { + return false; + } + + return true; +} + + +/* + * NeedsDistributedPlanningWalker checks if the query contains any distributed + * tables. + */ +static bool +NeedsDistributedPlanningWalker(Node *node, void *context) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, Query)) + { + Query *query = (Query *) node; + ListCell *rangeTableCell = NULL; + + foreach(rangeTableCell, query->rtable) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + + Oid relationId = rangeTableEntry->relid; + if (IsDistributedTable(relationId)) + { + return true; + } + } + + return query_tree_walker(query, NeedsDistributedPlanningWalker, NULL, 0); + } + else + { + return expression_tree_walker(node, NeedsDistributedPlanningWalker, NULL); + } +} + + /* * AssignRTEIdentities function modifies query tree by adding RTE identities to the * RTE_RELATIONs. diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index ae38ded87..faf6e7a68 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -3478,71 +3478,6 @@ FindNodesOfType(MultiNode *node, int type) } -/* - * NeedsDistributedPlanning checks if the passed in query is a query running - * on a distributed table. If it is, we start distributed planning. - * - * For distributed relations it also assigns identifiers to the relevant RTEs. - */ -bool -NeedsDistributedPlanning(Query *queryTree) -{ - CmdType commandType = queryTree->commandType; - List *rangeTableList = NIL; - ListCell *rangeTableCell = NULL; - bool hasLocalRelation = false; - bool hasDistributedRelation = false; - - if (commandType != CMD_SELECT && commandType != CMD_INSERT && - commandType != CMD_UPDATE && commandType != CMD_DELETE) - { - return false; - } - - /* - * We can handle INSERT INTO distributed_table SELECT ... even if the SELECT - * part references local tables, so skip the remaining checks. - */ - if (InsertSelectIntoDistributedTable(queryTree)) - { - return true; - } - - /* extract range table entries for simple relations only */ - ExtractRangeTableRelationWalker((Node *) queryTree, &rangeTableList); - - foreach(rangeTableCell, rangeTableList) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - - /* check if relation is local or distributed */ - Oid relationId = rangeTableEntry->relid; - - if (IsDistributedTable(relationId)) - { - hasDistributedRelation = true; - } - else - { - hasLocalRelation = true; - } - } - - if (hasLocalRelation && hasDistributedRelation) - { - if (InsertSelectIntoLocalTable(queryTree)) - { - ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a " - "local table"))); - } - ereport(ERROR, (errmsg("cannot plan queries which include both local and " - "distributed relations"))); - } - - return hasDistributedRelation; -} - - /* * ExtractRangeTableRelationWalker gathers all range table relation entries * in a query. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index d4fb9304d..932b4b6ec 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2485,8 +2485,15 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC { /* only hash partitioned tables are supported */ Oid distributedTableId = rte->relid; - char partitionMethod = PartitionMethod(distributedTableId); + char partitionMethod = 0; + if (!IsDistributedTable(distributedTableId)) + { + /* local tables cannot be read from workers */ + return false; + } + + partitionMethod = PartitionMethod(distributedTableId); if (!(partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_NONE || partitionMethod == DISTRIBUTE_BY_RANGE)) { diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index f97679969..07d0aad18 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -73,6 +73,7 @@ typedef struct RelationShard extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); +extern bool NeedsDistributedPlanning(Query *query); extern struct DistributedPlan * GetDistributedPlan(CustomScan *node); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index, RangeTblEntry *rte); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 937f6ca38..68b189d1f 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -192,7 +192,6 @@ extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery( Query *query); extern bool SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail); extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); -extern bool NeedsDistributedPlanning(Query *queryTree); extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode); diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index 80e6963a2..3436c4bbe 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -7,6 +7,7 @@ CREATE TABLE test_table_1(id int); INSERT INTO test_table_1 SELECT user_id FROM users_table; ERROR: cannot INSERT rows from a distributed query into a local table +HINT: Consider using CREATE TEMPORARY TABLE tmp AS SELECT ... and inserting from the temporary table. DROP TABLE test_table_1; ------------------------------------ ------------------------------------ diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index adf8ebf57..f2792ddf7 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -295,7 +295,7 @@ CREATE TABLE bidders ( name text, id bigint ); DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: relation bidders is not distributed -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) DELETE FROM limit_orders; @@ -429,7 +429,7 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: relation bidders is not distributed -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) UPDATE limit_orders SET symbol = 'GM'; diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index 55191357b..dbf4c3c1d 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -165,7 +165,7 @@ CREATE TABLE bidders ( name text, id bigint ); DELETE FROM limit_orders_mx USING bidders WHERE limit_orders_mx.id = 246 AND limit_orders_mx.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: relation bidders is not distributed -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) DELETE FROM limit_orders_mx; @@ -235,7 +235,7 @@ UPDATE limit_orders_mx SET limit_price = 0.00 FROM bidders WHERE limit_orders_mx.id = 246 AND limit_orders_mx.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: relation bidders is not distributed -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) UPDATE limit_orders_mx SET symbol = 'GM'; diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index 4a2c82bb9..04abdf5ee 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -175,14 +175,14 @@ HINT: Consider using an equality filter on the distributed table's partition co -- queries using CTEs are unsupported WITH long_names AS ( SELECT id FROM authors WHERE char_length(name) > 15 ) SELECT title FROM articles; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: relation authors is not distributed -- queries which involve functions in FROM clause are unsupported. SELECT * FROM articles, position('om' in 'Thomas'); ERROR: could not run distributed query with complex table expressions HINT: Consider using an equality filter on the distributed table's partition column. -- subqueries are not supported in WHERE clause in Citus SELECT * FROM articles WHERE author_id IN (SELECT id FROM authors WHERE name LIKE '%a'); -ERROR: cannot plan queries which include both local and distributed relations +ERROR: relation authors is not distributed -- subqueries are supported in FROM clause SELECT articles.id,test.word_count FROM articles, (SELECT id, word_count FROM articles) AS test WHERE test.id = articles.id @@ -248,10 +248,10 @@ ERROR: could not run distributed query with subquery outside the FROM and WHERE HINT: Consider using an equality filter on the distributed table's partition column. -- joins are not supported between local and distributed tables SELECT title, authors.name FROM authors, articles WHERE authors.id = articles.author_id; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: relation authors is not distributed -- inner joins are not supported (I think) SELECT * FROM (articles INNER JOIN authors ON articles.id = authors.id); -ERROR: cannot plan queries which include both local and distributed relations +ERROR: relation authors is not distributed -- test use of EXECUTE statements within plpgsql DO $sharded_execute$ BEGIN diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index 3ad9550c4..ed0607a47 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -183,6 +183,7 @@ SELECT count(*) FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = (1 row) SET citus.task_executor_type to DEFAULT; +-- materialized views work -- insert into... select works with views CREATE TABLE temp_lineitem(LIKE lineitem_hash_part); SELECT create_distributed_table('temp_lineitem', 'l_orderkey', 'hash', 'lineitem_hash_part'); @@ -206,6 +207,35 @@ SELECT count(*) FROM temp_lineitem; 1706 (1 row) +-- can create and query materialized views +CREATE MATERIALIZED VIEW mode_counts +AS SELECT l_shipmode, count(*) FROM temp_lineitem GROUP BY l_shipmode; +SELECT * FROM mode_counts WHERE l_shipmode = 'AIR' ORDER BY 2 DESC, 1 LIMIT 10; + l_shipmode | count +------------+------- + AIR | 1706 +(1 row) + +-- materialized views are local, cannot join with distributed tables +SELECT count(*) FROM mode_counts JOIN temp_lineitem USING (l_shipmode); +ERROR: relation mode_counts is not distributed +-- new data is not immediately reflected in the view +INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems; +SELECT * FROM mode_counts WHERE l_shipmode = 'AIR' ORDER BY 2 DESC, 1 LIMIT 10; + l_shipmode | count +------------+------- + AIR | 1706 +(1 row) + +-- refresh updates the materialised view with new data +REFRESH MATERIALIZED VIEW mode_counts; +SELECT * FROM mode_counts WHERE l_shipmode = 'AIR' ORDER BY 2 DESC, 1 LIMIT 10; + l_shipmode | count +------------+------- + AIR | 3412 +(1 row) + +DROP MATERIALIZED VIEW mode_counts; SET citus.task_executor_type to "task-tracker"; -- single view repartition subqueries are not supported SELECT l_suppkey, count(*) FROM diff --git a/src/test/regress/sql/multi_view.sql b/src/test/regress/sql/multi_view.sql index 9a67c2866..1bd54bcc0 100644 --- a/src/test/regress/sql/multi_view.sql +++ b/src/test/regress/sql/multi_view.sql @@ -86,6 +86,7 @@ SET citus.task_executor_type to "task-tracker"; SELECT count(*) FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey); SET citus.task_executor_type to DEFAULT; +-- materialized views work -- insert into... select works with views CREATE TABLE temp_lineitem(LIKE lineitem_hash_part); SELECT create_distributed_table('temp_lineitem', 'l_orderkey', 'hash', 'lineitem_hash_part'); @@ -95,6 +96,25 @@ SELECT count(*) FROM temp_lineitem; INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems WHERE l_shipmode = 'MAIL'; SELECT count(*) FROM temp_lineitem; +-- can create and query materialized views +CREATE MATERIALIZED VIEW mode_counts +AS SELECT l_shipmode, count(*) FROM temp_lineitem GROUP BY l_shipmode; + +SELECT * FROM mode_counts WHERE l_shipmode = 'AIR' ORDER BY 2 DESC, 1 LIMIT 10; + +-- materialized views are local, cannot join with distributed tables +SELECT count(*) FROM mode_counts JOIN temp_lineitem USING (l_shipmode); + +-- new data is not immediately reflected in the view +INSERT INTO temp_lineitem SELECT * FROM air_shipped_lineitems; +SELECT * FROM mode_counts WHERE l_shipmode = 'AIR' ORDER BY 2 DESC, 1 LIMIT 10; + +-- refresh updates the materialised view with new data +REFRESH MATERIALIZED VIEW mode_counts; +SELECT * FROM mode_counts WHERE l_shipmode = 'AIR' ORDER BY 2 DESC, 1 LIMIT 10; + +DROP MATERIALIZED VIEW mode_counts; + SET citus.task_executor_type to "task-tracker"; -- single view repartition subqueries are not supported