Allow queries with local tables in NeedsDistributedPlanning

pull/1857/head
Marco Slot 2017-12-04 17:01:10 +01:00
parent 3ceb15ccdf
commit d2bac081e8
8 changed files with 129 additions and 76 deletions

View File

@ -12,8 +12,8 @@
#include <float.h> #include <float.h>
#include <limits.h> #include <limits.h>
#include "catalog/pg_class.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
@ -41,6 +41,7 @@ int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log
/* local function forward declarations */ /* local function forward declarations */
static bool NeedsDistributedPlanningWalker(Node *node, void *context);
static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery,
Query *query, ParamListInfo boundParams, Query *query, ParamListInfo boundParams,
PlannerRestrictionContext * PlannerRestrictionContext *
@ -147,6 +148,116 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
} }
/*
* NeedsDistributedPlanning returns true if the Citus extension is loaded and
* the query contains a distributed table. It also errors out for a few
* unsupported cases, namely:
*
* - INSERT INTO <local table> SELECT ... <distributed table>
* - Subqueries that directly join a local table and a distributed table.
*/
bool
NeedsDistributedPlanning(Query *query)
{
CmdType commandType = query->commandType;
if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
commandType != CMD_UPDATE && commandType != CMD_DELETE)
{
return false;
}
if (!CitusHasBeenLoaded())
{
return false;
}
/*
* We can handle INSERT INTO distributed_table SELECT ... even if the SELECT
* part references local tables, so skip the remaining checks.
*/
if (InsertSelectIntoDistributedTable(query))
{
return true;
}
if (!NeedsDistributedPlanningWalker((Node *) query, NULL))
{
return false;
}
if (InsertSelectIntoLocalTable(query))
{
ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a "
"local table")));
}
return true;
}
/*
* NeedsDistributedPlanningWalker checks if the query contains any distributed
* tables. Additionally, it errors out if there is a join between a local and
* a distributed table.
*/
static bool
NeedsDistributedPlanningWalker(Node *node, void *context)
{
if (node == NULL)
{
return false;
}
if (IsA(node, Query))
{
Query *query = (Query *) node;
ListCell *rangeTableCell = NULL;
bool hasLocalRelation = false;
bool hasDistributedRelation = false;
foreach(rangeTableCell, query->rtable)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
Oid relationId = InvalidOid;
if (rangeTableEntry->rtekind != RTE_RELATION ||
rangeTableEntry->relkind == RELKIND_VIEW)
{
/* only consider tables */
continue;
}
relationId = rangeTableEntry->relid;
if (IsDistributedTable(relationId))
{
hasDistributedRelation = true;
}
else
{
hasLocalRelation = true;
}
}
if (hasLocalRelation && hasDistributedRelation)
{
ereport(ERROR, (errmsg("cannot plan queries which join local and "
"distributed relations")));
}
if (hasDistributedRelation)
{
return true;
}
return query_tree_walker(query, NeedsDistributedPlanningWalker, NULL, 0);
}
else
{
return expression_tree_walker(node, NeedsDistributedPlanningWalker, NULL);
}
}
/* /*
* AssignRTEIdentities function modifies query tree by adding RTE identities to the * AssignRTEIdentities function modifies query tree by adding RTE identities to the
* RTE_RELATIONs. * RTE_RELATIONs.

View File

@ -3478,71 +3478,6 @@ FindNodesOfType(MultiNode *node, int type)
} }
/*
* NeedsDistributedPlanning checks if the passed in query is a query running
* on a distributed table. If it is, we start distributed planning.
*
* For distributed relations it also assigns identifiers to the relevant RTEs.
*/
bool
NeedsDistributedPlanning(Query *queryTree)
{
CmdType commandType = queryTree->commandType;
List *rangeTableList = NIL;
ListCell *rangeTableCell = NULL;
bool hasLocalRelation = false;
bool hasDistributedRelation = false;
if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
commandType != CMD_UPDATE && commandType != CMD_DELETE)
{
return false;
}
/*
* We can handle INSERT INTO distributed_table SELECT ... even if the SELECT
* part references local tables, so skip the remaining checks.
*/
if (InsertSelectIntoDistributedTable(queryTree))
{
return true;
}
/* extract range table entries for simple relations only */
ExtractRangeTableRelationWalker((Node *) queryTree, &rangeTableList);
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/* check if relation is local or distributed */
Oid relationId = rangeTableEntry->relid;
if (IsDistributedTable(relationId))
{
hasDistributedRelation = true;
}
else
{
hasLocalRelation = true;
}
}
if (hasLocalRelation && hasDistributedRelation)
{
if (InsertSelectIntoLocalTable(queryTree))
{
ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a "
"local table")));
}
ereport(ERROR, (errmsg("cannot plan queries which include both local and "
"distributed relations")));
}
return hasDistributedRelation;
}
/* /*
* ExtractRangeTableRelationWalker gathers all range table relation entries * ExtractRangeTableRelationWalker gathers all range table relation entries
* in a query. * in a query.

View File

@ -2485,8 +2485,15 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC
{ {
/* only hash partitioned tables are supported */ /* only hash partitioned tables are supported */
Oid distributedTableId = rte->relid; Oid distributedTableId = rte->relid;
char partitionMethod = PartitionMethod(distributedTableId); char partitionMethod = 0;
if (!IsDistributedTable(distributedTableId))
{
/* local tables cannot be read from workers */
return false;
}
partitionMethod = PartitionMethod(distributedTableId);
if (!(partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == if (!(partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
DISTRIBUTE_BY_NONE || partitionMethod == DISTRIBUTE_BY_RANGE)) DISTRIBUTE_BY_NONE || partitionMethod == DISTRIBUTE_BY_RANGE))
{ {

View File

@ -73,6 +73,7 @@ typedef struct RelationShard
extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions,
ParamListInfo boundParams); ParamListInfo boundParams);
extern bool NeedsDistributedPlanning(Query *query);
extern struct DistributedPlan * GetDistributedPlan(CustomScan *node); extern struct DistributedPlan * GetDistributedPlan(CustomScan *node);
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
Index index, RangeTblEntry *rte); Index index, RangeTblEntry *rte);

View File

@ -192,7 +192,6 @@ extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
Query *query); Query *query);
extern bool SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail); extern bool SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail);
extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList);
extern bool NeedsDistributedPlanning(Query *queryTree);
extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ParentNode(MultiNode *multiNode);
extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode);
extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode); extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode);

View File

@ -295,7 +295,7 @@ CREATE TABLE bidders ( name text, id bigint );
DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND
limit_orders.bidder_id = bidders.id AND limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
ERROR: cannot plan queries which include both local and distributed relations ERROR: cannot plan queries which join local and distributed relations
-- commands containing a CTE are unsupported -- commands containing a CTE are unsupported
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *)
DELETE FROM limit_orders; DELETE FROM limit_orders;
@ -429,7 +429,7 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders
WHERE limit_orders.id = 246 AND WHERE limit_orders.id = 246 AND
limit_orders.bidder_id = bidders.id AND limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
ERROR: cannot plan queries which include both local and distributed relations ERROR: cannot plan queries which join local and distributed relations
-- commands containing a CTE are unsupported -- commands containing a CTE are unsupported
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *)
UPDATE limit_orders SET symbol = 'GM'; UPDATE limit_orders SET symbol = 'GM';

View File

@ -165,7 +165,7 @@ CREATE TABLE bidders ( name text, id bigint );
DELETE FROM limit_orders_mx USING bidders WHERE limit_orders_mx.id = 246 AND DELETE FROM limit_orders_mx USING bidders WHERE limit_orders_mx.id = 246 AND
limit_orders_mx.bidder_id = bidders.id AND limit_orders_mx.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
ERROR: cannot plan queries which include both local and distributed relations ERROR: cannot plan queries which join local and distributed relations
-- commands containing a CTE are unsupported -- commands containing a CTE are unsupported
WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *)
DELETE FROM limit_orders_mx; DELETE FROM limit_orders_mx;
@ -235,7 +235,7 @@ UPDATE limit_orders_mx SET limit_price = 0.00 FROM bidders
WHERE limit_orders_mx.id = 246 AND WHERE limit_orders_mx.id = 246 AND
limit_orders_mx.bidder_id = bidders.id AND limit_orders_mx.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
ERROR: cannot plan queries which include both local and distributed relations ERROR: cannot plan queries which join local and distributed relations
-- commands containing a CTE are unsupported -- commands containing a CTE are unsupported
WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *)
UPDATE limit_orders_mx SET symbol = 'GM'; UPDATE limit_orders_mx SET symbol = 'GM';

View File

@ -175,14 +175,14 @@ HINT: Consider using an equality filter on the distributed table's partition co
-- queries using CTEs are unsupported -- queries using CTEs are unsupported
WITH long_names AS ( SELECT id FROM authors WHERE char_length(name) > 15 ) WITH long_names AS ( SELECT id FROM authors WHERE char_length(name) > 15 )
SELECT title FROM articles; SELECT title FROM articles;
ERROR: cannot plan queries which include both local and distributed relations ERROR: relation authors is not distributed
-- queries which involve functions in FROM clause are unsupported. -- queries which involve functions in FROM clause are unsupported.
SELECT * FROM articles, position('om' in 'Thomas'); SELECT * FROM articles, position('om' in 'Thomas');
ERROR: could not run distributed query with complex table expressions ERROR: could not run distributed query with complex table expressions
HINT: Consider using an equality filter on the distributed table's partition column. HINT: Consider using an equality filter on the distributed table's partition column.
-- subqueries are not supported in WHERE clause in Citus -- subqueries are not supported in WHERE clause in Citus
SELECT * FROM articles WHERE author_id IN (SELECT id FROM authors WHERE name LIKE '%a'); SELECT * FROM articles WHERE author_id IN (SELECT id FROM authors WHERE name LIKE '%a');
ERROR: cannot plan queries which include both local and distributed relations ERROR: relation authors is not distributed
-- subqueries are supported in FROM clause -- subqueries are supported in FROM clause
SELECT articles.id,test.word_count SELECT articles.id,test.word_count
FROM articles, (SELECT id, word_count FROM articles) AS test WHERE test.id = articles.id FROM articles, (SELECT id, word_count FROM articles) AS test WHERE test.id = articles.id
@ -248,10 +248,10 @@ ERROR: could not run distributed query with subquery outside the FROM and WHERE
HINT: Consider using an equality filter on the distributed table's partition column. HINT: Consider using an equality filter on the distributed table's partition column.
-- joins are not supported between local and distributed tables -- joins are not supported between local and distributed tables
SELECT title, authors.name FROM authors, articles WHERE authors.id = articles.author_id; SELECT title, authors.name FROM authors, articles WHERE authors.id = articles.author_id;
ERROR: cannot plan queries which include both local and distributed relations ERROR: cannot plan queries which join local and distributed relations
-- inner joins are not supported (I think) -- inner joins are not supported (I think)
SELECT * FROM (articles INNER JOIN authors ON articles.id = authors.id); SELECT * FROM (articles INNER JOIN authors ON articles.id = authors.id);
ERROR: cannot plan queries which include both local and distributed relations ERROR: cannot plan queries which join local and distributed relations
-- test use of EXECUTE statements within plpgsql -- test use of EXECUTE statements within plpgsql
DO $sharded_execute$ DO $sharded_execute$
BEGIN BEGIN