From d2bac081e84d53468d87d6fa0e995fced5052bb8 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 4 Dec 2017 17:01:10 +0100 Subject: [PATCH] Allow queries with local tables in NeedsDistributedPlanning --- .../distributed/planner/distributed_planner.c | 113 +++++++++++++++++- .../planner/multi_logical_planner.c | 65 ---------- .../planner/multi_router_planner.c | 9 +- src/include/distributed/distributed_planner.h | 1 + .../distributed/multi_logical_planner.h | 1 - .../regress/expected/multi_modifications.out | 4 +- .../expected/multi_mx_modifications.out | 4 +- .../regress/expected/multi_simple_queries.out | 8 +- 8 files changed, 129 insertions(+), 76 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index e8a294528..5978ce04a 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -12,8 +12,8 @@ #include #include +#include "catalog/pg_class.h" #include "catalog/pg_type.h" - #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" #include "distributed/insert_select_planner.h" @@ -41,6 +41,7 @@ int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log /* local function forward declarations */ +static bool NeedsDistributedPlanningWalker(Node *node, void *context); static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query, ParamListInfo boundParams, PlannerRestrictionContext * @@ -147,6 +148,116 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } +/* + * NeedsDistributedPlanning returns true if the Citus extension is loaded and + * the query contains a distributed table. It also errors out for a few + * unsupported cases, namely: + * + * - INSERT INTO SELECT ... + * - Subqueries that directly join a local table and a distributed table. + */ +bool +NeedsDistributedPlanning(Query *query) +{ + CmdType commandType = query->commandType; + if (commandType != CMD_SELECT && commandType != CMD_INSERT && + commandType != CMD_UPDATE && commandType != CMD_DELETE) + { + return false; + } + + if (!CitusHasBeenLoaded()) + { + return false; + } + + /* + * We can handle INSERT INTO distributed_table SELECT ... even if the SELECT + * part references local tables, so skip the remaining checks. + */ + if (InsertSelectIntoDistributedTable(query)) + { + return true; + } + + if (!NeedsDistributedPlanningWalker((Node *) query, NULL)) + { + return false; + } + + if (InsertSelectIntoLocalTable(query)) + { + ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a " + "local table"))); + } + + return true; +} + + +/* + * NeedsDistributedPlanningWalker checks if the query contains any distributed + * tables. Additionally, it errors out if there is a join between a local and + * a distributed table. + */ +static bool +NeedsDistributedPlanningWalker(Node *node, void *context) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, Query)) + { + Query *query = (Query *) node; + ListCell *rangeTableCell = NULL; + bool hasLocalRelation = false; + bool hasDistributedRelation = false; + + foreach(rangeTableCell, query->rtable) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + Oid relationId = InvalidOid; + + if (rangeTableEntry->rtekind != RTE_RELATION || + rangeTableEntry->relkind == RELKIND_VIEW) + { + /* only consider tables */ + continue; + } + + relationId = rangeTableEntry->relid; + if (IsDistributedTable(relationId)) + { + hasDistributedRelation = true; + } + else + { + hasLocalRelation = true; + } + } + + if (hasLocalRelation && hasDistributedRelation) + { + ereport(ERROR, (errmsg("cannot plan queries which join local and " + "distributed relations"))); + } + + if (hasDistributedRelation) + { + return true; + } + + return query_tree_walker(query, NeedsDistributedPlanningWalker, NULL, 0); + } + else + { + return expression_tree_walker(node, NeedsDistributedPlanningWalker, NULL); + } +} + + /* * AssignRTEIdentities function modifies query tree by adding RTE identities to the * RTE_RELATIONs. diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index ae38ded87..faf6e7a68 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -3478,71 +3478,6 @@ FindNodesOfType(MultiNode *node, int type) } -/* - * NeedsDistributedPlanning checks if the passed in query is a query running - * on a distributed table. If it is, we start distributed planning. - * - * For distributed relations it also assigns identifiers to the relevant RTEs. - */ -bool -NeedsDistributedPlanning(Query *queryTree) -{ - CmdType commandType = queryTree->commandType; - List *rangeTableList = NIL; - ListCell *rangeTableCell = NULL; - bool hasLocalRelation = false; - bool hasDistributedRelation = false; - - if (commandType != CMD_SELECT && commandType != CMD_INSERT && - commandType != CMD_UPDATE && commandType != CMD_DELETE) - { - return false; - } - - /* - * We can handle INSERT INTO distributed_table SELECT ... even if the SELECT - * part references local tables, so skip the remaining checks. - */ - if (InsertSelectIntoDistributedTable(queryTree)) - { - return true; - } - - /* extract range table entries for simple relations only */ - ExtractRangeTableRelationWalker((Node *) queryTree, &rangeTableList); - - foreach(rangeTableCell, rangeTableList) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - - /* check if relation is local or distributed */ - Oid relationId = rangeTableEntry->relid; - - if (IsDistributedTable(relationId)) - { - hasDistributedRelation = true; - } - else - { - hasLocalRelation = true; - } - } - - if (hasLocalRelation && hasDistributedRelation) - { - if (InsertSelectIntoLocalTable(queryTree)) - { - ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a " - "local table"))); - } - ereport(ERROR, (errmsg("cannot plan queries which include both local and " - "distributed relations"))); - } - - return hasDistributedRelation; -} - - /* * ExtractRangeTableRelationWalker gathers all range table relation entries * in a query. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index d4fb9304d..932b4b6ec 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2485,8 +2485,15 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC { /* only hash partitioned tables are supported */ Oid distributedTableId = rte->relid; - char partitionMethod = PartitionMethod(distributedTableId); + char partitionMethod = 0; + if (!IsDistributedTable(distributedTableId)) + { + /* local tables cannot be read from workers */ + return false; + } + + partitionMethod = PartitionMethod(distributedTableId); if (!(partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_NONE || partitionMethod == DISTRIBUTE_BY_RANGE)) { diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index f97679969..07d0aad18 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -73,6 +73,7 @@ typedef struct RelationShard extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); +extern bool NeedsDistributedPlanning(Query *query); extern struct DistributedPlan * GetDistributedPlan(CustomScan *node); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index, RangeTblEntry *rte); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 937f6ca38..68b189d1f 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -192,7 +192,6 @@ extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery( Query *query); extern bool SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail); extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); -extern bool NeedsDistributedPlanning(Query *queryTree); extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode); diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index adf8ebf57..3a9f755ef 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -295,7 +295,7 @@ CREATE TABLE bidders ( name text, id bigint ); DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: cannot plan queries which join local and distributed relations -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) DELETE FROM limit_orders; @@ -429,7 +429,7 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: cannot plan queries which join local and distributed relations -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) UPDATE limit_orders SET symbol = 'GM'; diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index 55191357b..05e6e2ef5 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -165,7 +165,7 @@ CREATE TABLE bidders ( name text, id bigint ); DELETE FROM limit_orders_mx USING bidders WHERE limit_orders_mx.id = 246 AND limit_orders_mx.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: cannot plan queries which join local and distributed relations -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) DELETE FROM limit_orders_mx; @@ -235,7 +235,7 @@ UPDATE limit_orders_mx SET limit_price = 0.00 FROM bidders WHERE limit_orders_mx.id = 246 AND limit_orders_mx.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: cannot plan queries which join local and distributed relations -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) UPDATE limit_orders_mx SET symbol = 'GM'; diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index 4a2c82bb9..923633244 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -175,14 +175,14 @@ HINT: Consider using an equality filter on the distributed table's partition co -- queries using CTEs are unsupported WITH long_names AS ( SELECT id FROM authors WHERE char_length(name) > 15 ) SELECT title FROM articles; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: relation authors is not distributed -- queries which involve functions in FROM clause are unsupported. SELECT * FROM articles, position('om' in 'Thomas'); ERROR: could not run distributed query with complex table expressions HINT: Consider using an equality filter on the distributed table's partition column. -- subqueries are not supported in WHERE clause in Citus SELECT * FROM articles WHERE author_id IN (SELECT id FROM authors WHERE name LIKE '%a'); -ERROR: cannot plan queries which include both local and distributed relations +ERROR: relation authors is not distributed -- subqueries are supported in FROM clause SELECT articles.id,test.word_count FROM articles, (SELECT id, word_count FROM articles) AS test WHERE test.id = articles.id @@ -248,10 +248,10 @@ ERROR: could not run distributed query with subquery outside the FROM and WHERE HINT: Consider using an equality filter on the distributed table's partition column. -- joins are not supported between local and distributed tables SELECT title, authors.name FROM authors, articles WHERE authors.id = articles.author_id; -ERROR: cannot plan queries which include both local and distributed relations +ERROR: cannot plan queries which join local and distributed relations -- inner joins are not supported (I think) SELECT * FROM (articles INNER JOIN authors ON articles.id = authors.id); -ERROR: cannot plan queries which include both local and distributed relations +ERROR: cannot plan queries which join local and distributed relations -- test use of EXECUTE statements within plpgsql DO $sharded_execute$ BEGIN