mirror of https://github.com/citusdata/citus.git
Improve error checks for subquery pushdown and INSERT ... SELECT
Since we allow subqueries in WHERE clause with the previous commit, we should apply the same limitations to those subqueries. With this commit, we do not iterate on each subquery one by one. Instead, we extract all the subqueries and apply the checks directly on those subqueries. The aim of this change is to (i) Simplify the code (ii) Make it close to the checks on INSERT .. SELECT code base.pull/1342/head
parent
6f4ee50324
commit
f4c48134f8
|
@ -2978,12 +2978,12 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
|
|||
|
||||
/*
|
||||
* ExtractQueryWalker walks over a query, and finds all queries in the query
|
||||
* tree and returns these queries.
|
||||
* tree and returns these queries. Note that the function also recurses into
|
||||
* the subqueries in WHERE clause.
|
||||
*/
|
||||
bool
|
||||
ExtractQueryWalker(Node *node, List **queryList)
|
||||
{
|
||||
bool walkerResult = false;
|
||||
if (node == NULL)
|
||||
{
|
||||
return false;
|
||||
|
@ -2994,11 +2994,10 @@ ExtractQueryWalker(Node *node, List **queryList)
|
|||
Query *query = (Query *) node;
|
||||
|
||||
(*queryList) = lappend(*queryList, query);
|
||||
walkerResult = query_tree_walker(query, ExtractQueryWalker, queryList,
|
||||
QTW_EXAMINE_RTES);
|
||||
return query_tree_walker(query, ExtractQueryWalker, queryList, 0);
|
||||
}
|
||||
|
||||
return walkerResult;
|
||||
return expression_tree_walker(node, ExtractQueryWalker, queryList);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -394,11 +394,9 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
|
|||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext)
|
||||
{
|
||||
ListCell *rangeTableEntryCell = NULL;
|
||||
ListCell *sublinkCell = NULL;
|
||||
List *subqueryEntryList = NIL;
|
||||
bool outerMostQueryHasLimit = false;
|
||||
List *sublinkList = NIL;
|
||||
ListCell *subqueryCell = NULL;
|
||||
List *subqueryList = NIL;
|
||||
DeferredErrorMessage *error = NULL;
|
||||
RelationRestrictionContext *relationRestrictionContext =
|
||||
plannerRestrictionContext->relationRestrictionContext;
|
||||
|
@ -437,13 +435,20 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
|
|||
"equality operator.", NULL);
|
||||
}
|
||||
|
||||
subqueryEntryList = SubqueryEntryList(originalQuery);
|
||||
foreach(rangeTableEntryCell, subqueryEntryList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = lfirst(rangeTableEntryCell);
|
||||
Query *subquery = rangeTableEntry->subquery;
|
||||
/*
|
||||
* We first extract all the queries that appear in the orignal query. Later,
|
||||
* we delete the original query given that error rules does not apply to the
|
||||
* top level query.
|
||||
*/
|
||||
ExtractQueryWalker((Node *) originalQuery, &subqueryList);
|
||||
subqueryList = list_delete(subqueryList, originalQuery);
|
||||
|
||||
error = DeferErrorIfCannotPushdownSubquery(subquery, outerMostQueryHasLimit);
|
||||
/* iterate on the subquery list and error out accordingly */
|
||||
foreach(subqueryCell, subqueryList)
|
||||
{
|
||||
Query *subquery = lfirst(subqueryCell);
|
||||
error = DeferErrorIfCannotPushdownSubquery(subquery,
|
||||
outerMostQueryHasLimit);
|
||||
if (error)
|
||||
{
|
||||
return error;
|
||||
|
@ -456,23 +461,6 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
|
|||
}
|
||||
}
|
||||
|
||||
sublinkList = SublinkList(originalQuery);
|
||||
foreach(sublinkCell, sublinkList)
|
||||
{
|
||||
SubLink *sublink = (SubLink *) lfirst(sublinkCell);
|
||||
Node *subselect = sublink->subselect;
|
||||
|
||||
if (subselect && IsA(subselect, Query))
|
||||
{
|
||||
error = DeferErrorIfCannotPushdownSubquery((Query *) subselect,
|
||||
outerMostQueryHasLimit);
|
||||
if (error)
|
||||
{
|
||||
return error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -489,6 +477,18 @@ DeferErrorIfUnsupportedFilters(Query *subquery)
|
|||
ListCell *queryCell = NULL;
|
||||
List *subqueryOpExpressionList = NIL;
|
||||
List *relationIdList = RelationIdList(subquery);
|
||||
Var *partitionColumn = NULL;
|
||||
Oid relationId = InvalidOid;
|
||||
|
||||
/*
|
||||
* If there are no appropriate relations, we're going to error out on
|
||||
* DeferErrorIfCannotPushdownSubquery(). It may happen once the subquery
|
||||
* does not include a relation.
|
||||
*/
|
||||
if (relationIdList == NIL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get relation id of any relation in the subquery and create partiton column
|
||||
|
@ -496,8 +496,8 @@ DeferErrorIfUnsupportedFilters(Query *subquery)
|
|||
* expressions on different tables. Then we compare these operator expressions
|
||||
* to see if they consist of same operator and constant value.
|
||||
*/
|
||||
Oid relationId = linitial_oid(relationIdList);
|
||||
Var *partitionColumn = PartitionColumn(relationId, 0);
|
||||
relationId = linitial_oid(relationIdList);
|
||||
partitionColumn = PartitionColumn(relationId, 0);
|
||||
|
||||
ExtractQueryWalker((Node *) subquery, &queryList);
|
||||
foreach(queryCell, queryList)
|
||||
|
@ -617,8 +617,6 @@ DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLi
|
|||
{
|
||||
bool preconditionsSatisfied = true;
|
||||
char *errorDetail = NULL;
|
||||
List *subqueryEntryList = NIL;
|
||||
ListCell *rangeTableEntryCell = NULL;
|
||||
DeferredErrorMessage *deferredError = NULL;
|
||||
|
||||
deferredError = DeferErrorIfUnsupportedTableCombination(subqueryTree);
|
||||
|
@ -748,41 +746,20 @@ DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLi
|
|||
errorDetail, NULL);
|
||||
}
|
||||
|
||||
/* recursively do same check for subqueries of this query */
|
||||
subqueryEntryList = SubqueryEntryList(subqueryTree);
|
||||
foreach(rangeTableEntryCell, subqueryEntryList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry =
|
||||
(RangeTblEntry *) lfirst(rangeTableEntryCell);
|
||||
|
||||
Query *innerSubquery = rangeTableEntry->subquery;
|
||||
deferredError = DeferErrorIfCannotPushdownSubquery(innerSubquery,
|
||||
outerMostQueryHasLimit);
|
||||
if (deferredError)
|
||||
{
|
||||
return deferredError;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeferErrorIfUnsupportedUnionQuery is a helper function for ErrorIfCannotPushdownSubquery().
|
||||
* It basically iterates over the subqueries that reside under the given set operations.
|
||||
*
|
||||
* The function also errors out for set operations INTERSECT and EXCEPT.
|
||||
*/
|
||||
static DeferredErrorMessage *
|
||||
DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree,
|
||||
bool outerMostQueryHasLimit)
|
||||
{
|
||||
List *rangeTableIndexList = NIL;
|
||||
ListCell *rangeTableIndexCell = NULL;
|
||||
List *setOperationStatementList = NIL;
|
||||
ListCell *setOperationStatmentCell = NULL;
|
||||
List *rangeTableList = subqueryTree->rtable;
|
||||
|
||||
ExtractSetOperationStatmentWalker((Node *) subqueryTree->setOperations,
|
||||
&setOperationStatementList);
|
||||
|
@ -799,24 +776,6 @@ DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree,
|
|||
}
|
||||
}
|
||||
|
||||
ExtractRangeTableIndexWalker((Node *) subqueryTree->setOperations,
|
||||
&rangeTableIndexList);
|
||||
foreach(rangeTableIndexCell, rangeTableIndexList)
|
||||
{
|
||||
int rangeTableIndex = lfirst_int(rangeTableIndexCell);
|
||||
RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableIndex, rangeTableList);
|
||||
DeferredErrorMessage *deferredError = NULL;
|
||||
|
||||
Assert(rangeTableEntry->rtekind == RTE_SUBQUERY);
|
||||
|
||||
deferredError = DeferErrorIfCannotPushdownSubquery(rangeTableEntry->subquery,
|
||||
outerMostQueryHasLimit);
|
||||
if (deferredError)
|
||||
{
|
||||
return deferredError;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -3038,8 +3038,6 @@ CopyRelationRestrictionContext(RelationRestrictionContext *oldContext)
|
|||
|
||||
/* not copyable, but readonly */
|
||||
newRestriction->plannerInfo = oldRestriction->plannerInfo;
|
||||
newRestriction->prunedShardIntervalList =
|
||||
copyObject(oldRestriction->prunedShardIntervalList);
|
||||
|
||||
newContext->relationRestrictionList =
|
||||
lappend(newContext->relationRestrictionList, newRestriction);
|
||||
|
|
|
@ -130,7 +130,7 @@ FROM
|
|||
raw_events_first
|
||||
WHERE
|
||||
user_id < 0;
|
||||
NOTICE: evaluating on master
|
||||
ERROR: Subqueries without relations are not allowed in INSERT ... SELECT queries
|
||||
-- make sure we don't evaluate stable functions with column arguments
|
||||
CREATE OR REPLACE FUNCTION evaluate_on_master(x int)
|
||||
RETURNS int LANGUAGE plpgsql STABLE
|
||||
|
@ -672,11 +672,7 @@ INSERT INTO agg_events
|
|||
raw_events_first.user_id, (SELECT * FROM sub_cte)
|
||||
FROM
|
||||
raw_events_first;
|
||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg) WITH sub_cte AS (SELECT 1) SELECT user_id, (SELECT sub_cte."?column?" FROM sub_cte) FROM public.raw_events_first_13300000 raw_events_first WHERE ((worker_hash(user_id) >= '-2147483648'::integer) AND (worker_hash(user_id) <= '-1073741825'::integer))
|
||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg) WITH sub_cte AS (SELECT 1) SELECT user_id, (SELECT sub_cte."?column?" FROM sub_cte) FROM public.raw_events_first_13300001 raw_events_first WHERE ((worker_hash(user_id) >= '-1073741824'::integer) AND (worker_hash(user_id) <= '-1'::integer))
|
||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg) WITH sub_cte AS (SELECT 1) SELECT user_id, (SELECT sub_cte."?column?" FROM sub_cte) FROM public.raw_events_first_13300002 raw_events_first WHERE ((worker_hash(user_id) >= 0) AND (worker_hash(user_id) <= 1073741823))
|
||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg) WITH sub_cte AS (SELECT 1) SELECT user_id, (SELECT sub_cte."?column?" FROM sub_cte) FROM public.raw_events_first_13300003 raw_events_first WHERE ((worker_hash(user_id) >= 1073741824) AND (worker_hash(user_id) <= 2147483647))
|
||||
DEBUG: Plan is router executable
|
||||
ERROR: Subqueries without relations are not allowed in INSERT ... SELECT queries
|
||||
-- We do not support any set operations
|
||||
INSERT INTO
|
||||
raw_events_first(user_id)
|
||||
|
@ -1581,6 +1577,30 @@ ERROR: grouping sets are not allowed in INSERT ... SELECT queries
|
|||
SET client_min_messages TO INFO;
|
||||
-- avoid constraint violations
|
||||
TRUNCATE raw_events_first;
|
||||
-- we don't support LIMIT even if it exists in the subqueries
|
||||
-- in where clause
|
||||
INSERT INTO agg_events(user_id)
|
||||
SELECT user_id
|
||||
FROM users_table
|
||||
WHERE user_id
|
||||
IN (SELECT
|
||||
user_id
|
||||
FROM (
|
||||
(
|
||||
SELECT
|
||||
user_id
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
e1.user_id
|
||||
FROM
|
||||
users_table u1, events_table e1
|
||||
WHERE
|
||||
e1.user_id = u1.user_id LIMIT 3
|
||||
) as f_inner
|
||||
)
|
||||
) AS f2);
|
||||
ERROR: LIMIT clauses are not allowed in INSERT ... SELECT queries
|
||||
-- Altering a table and selecting from it using a multi-shard statement
|
||||
-- in the same transaction is allowed because we will use the same
|
||||
-- connections for all co-located placements.
|
||||
|
|
|
@ -2438,7 +2438,7 @@ FROM
|
|||
FROM
|
||||
(SELECT
|
||||
1 as user_id, now(), 3 AS event
|
||||
) events_subquery_4) OFFSET 3) t1
|
||||
) events_subquery_4)) t1
|
||||
GROUP BY "t1"."user_id") AS t) "q"
|
||||
INNER JOIN
|
||||
(SELECT
|
||||
|
@ -2498,7 +2498,7 @@ FROM
|
|||
FROM
|
||||
(SELECT
|
||||
1 as user_id, now(), 3 AS event
|
||||
) events_subquery_4) OFFSET 3) t1
|
||||
) events_subquery_4)) t1
|
||||
GROUP BY "t1"."user_id") AS t) "q"
|
||||
INNER JOIN
|
||||
(SELECT random()::int as user_id) AS t
|
||||
|
|
|
@ -530,6 +530,35 @@ WHERE
|
|||
);
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Offset clause is currently unsupported
|
||||
-- we can detect errors even if they appear in WHERE -> FROM -> WHERE
|
||||
-- subqueries
|
||||
SELECT user_id
|
||||
FROM users_table
|
||||
WHERE user_id
|
||||
IN (SELECT
|
||||
f_inner.user_id
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
e1.user_id
|
||||
FROM
|
||||
users_table u1, events_table e1
|
||||
WHERE
|
||||
e1.user_id = u1.user_id
|
||||
) as f_inner,
|
||||
(
|
||||
SELECT
|
||||
e1.user_id
|
||||
FROM
|
||||
users_table u1, events_table e1
|
||||
WHERE
|
||||
e1.user_id = u1.user_id
|
||||
AND e1.user_id IN (SELECT user_id FROM users_table LIMIT 3 )
|
||||
) as f_outer
|
||||
WHERE f_inner.user_id = f_outer.user_id
|
||||
);
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Limit in subquery is currently unsupported
|
||||
-- semi join is not on the partition key for the third subquery
|
||||
SELECT user_id
|
||||
FROM users_table
|
||||
|
|
|
@ -1311,6 +1311,30 @@ SET client_min_messages TO INFO;
|
|||
-- avoid constraint violations
|
||||
TRUNCATE raw_events_first;
|
||||
|
||||
-- we don't support LIMIT even if it exists in the subqueries
|
||||
-- in where clause
|
||||
INSERT INTO agg_events(user_id)
|
||||
SELECT user_id
|
||||
FROM users_table
|
||||
WHERE user_id
|
||||
IN (SELECT
|
||||
user_id
|
||||
FROM (
|
||||
(
|
||||
SELECT
|
||||
user_id
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
e1.user_id
|
||||
FROM
|
||||
users_table u1, events_table e1
|
||||
WHERE
|
||||
e1.user_id = u1.user_id LIMIT 3
|
||||
) as f_inner
|
||||
)
|
||||
) AS f2);
|
||||
|
||||
-- Altering a table and selecting from it using a multi-shard statement
|
||||
-- in the same transaction is allowed because we will use the same
|
||||
-- connections for all co-located placements.
|
||||
|
|
|
@ -2216,7 +2216,7 @@ FROM
|
|||
FROM
|
||||
(SELECT
|
||||
1 as user_id, now(), 3 AS event
|
||||
) events_subquery_4) OFFSET 3) t1
|
||||
) events_subquery_4)) t1
|
||||
GROUP BY "t1"."user_id") AS t) "q"
|
||||
INNER JOIN
|
||||
(SELECT
|
||||
|
@ -2275,7 +2275,7 @@ FROM
|
|||
FROM
|
||||
(SELECT
|
||||
1 as user_id, now(), 3 AS event
|
||||
) events_subquery_4) OFFSET 3) t1
|
||||
) events_subquery_4)) t1
|
||||
GROUP BY "t1"."user_id") AS t) "q"
|
||||
INNER JOIN
|
||||
(SELECT random()::int as user_id) AS t
|
||||
|
|
|
@ -462,6 +462,34 @@ WHERE
|
|||
OFFSET 3
|
||||
);
|
||||
|
||||
-- we can detect errors even if they appear in WHERE -> FROM -> WHERE
|
||||
-- subqueries
|
||||
SELECT user_id
|
||||
FROM users_table
|
||||
WHERE user_id
|
||||
IN (SELECT
|
||||
f_inner.user_id
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
e1.user_id
|
||||
FROM
|
||||
users_table u1, events_table e1
|
||||
WHERE
|
||||
e1.user_id = u1.user_id
|
||||
) as f_inner,
|
||||
(
|
||||
SELECT
|
||||
e1.user_id
|
||||
FROM
|
||||
users_table u1, events_table e1
|
||||
WHERE
|
||||
e1.user_id = u1.user_id
|
||||
AND e1.user_id IN (SELECT user_id FROM users_table LIMIT 3 )
|
||||
) as f_outer
|
||||
WHERE f_inner.user_id = f_outer.user_id
|
||||
);
|
||||
|
||||
-- semi join is not on the partition key for the third subquery
|
||||
SELECT user_id
|
||||
FROM users_table
|
||||
|
|
Loading…
Reference in New Issue