Merge pull request #1840 from citusdata/remove_filter_checks

Remove filter checks on leaf queries
pull/1841/head
Marco Slot 2017-11-30 12:52:11 +01:00 committed by GitHub
commit 581c8c02cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 141 additions and 325 deletions

View File

@ -3271,185 +3271,6 @@ ExtractQueryWalker(Node *node, List **queryList)
}
/*
* LeafQuery checks if the given query is a leaf query. Leaf queries have only
* simple relations in the join tree.
*/
bool
LeafQuery(Query *queryTree)
{
List *rangeTableList = queryTree->rtable;
List *joinTreeTableIndexList = NIL;
ListCell *joinTreeTableIndexCell = NULL;
bool leafQuery = true;
/*
* Extract all range table indexes from the join tree. Note that sub-queries
* that get pulled up by PostgreSQL don't appear in this join tree.
*/
ExtractRangeTableIndexWalker((Node *) queryTree->jointree, &joinTreeTableIndexList);
foreach(joinTreeTableIndexCell, joinTreeTableIndexList)
{
/*
* Join tree's range table index starts from 1 in the query tree. But,
* list indexes start from 0.
*/
int joinTreeTableIndex = lfirst_int(joinTreeTableIndexCell);
int rangeTableListIndex = joinTreeTableIndex - 1;
RangeTblEntry *rangeTableEntry =
(RangeTblEntry *) list_nth(rangeTableList, rangeTableListIndex);
/*
* Check if the range table in the join tree is a simple relation.
*/
if (rangeTableEntry->rtekind != RTE_RELATION)
{
leafQuery = false;
}
}
return leafQuery;
}
/*
* PartitionColumnOpExpressionList returns operator expressions which are on
* partition column in the query. This function walks over where clause list,
* finds operator expressions on partition column and returns them in a new list.
*/
List *
PartitionColumnOpExpressionList(Query *query)
{
List *whereClauseList = WhereClauseList(query->jointree);
List *partitionColumnOpExpressionList = NIL;
ListCell *whereClauseCell = NULL;
foreach(whereClauseCell, whereClauseList)
{
Node *whereNode = (Node *) lfirst(whereClauseCell);
Node *leftArgument = NULL;
Node *rightArgument = NULL;
Node *strippedLeftArgument = NULL;
Node *strippedRightArgument = NULL;
OpExpr *whereClause = NULL;
List *argumentList = NIL;
List *rangetableList = NIL;
uint32 argumentCount = 0;
Var *candidatePartitionColumn = NULL;
Var *partitionColumn = NULL;
Index rangeTableEntryIndex = 0;
RangeTblEntry *rangeTableEntry = NULL;
Oid relationId = InvalidOid;
if (!IsA(whereNode, OpExpr))
{
continue;
}
whereClause = (OpExpr *) whereNode;
argumentList = whereClause->args;
/*
* Select clauses must have two arguments. Note that logic here use to
* find select clauses is very similar to IsSelectClause(). But we are
* not able to reuse it, because it calls pull_var_clause_default()
* which in return deep down calls pull_var_clause_walker(), and this
* function errors out for variable level other than 0 which is the case
* for lateral joins.
*/
argumentCount = list_length(argumentList);
if (argumentCount != 2)
{
continue;
}
leftArgument = (Node *) linitial(argumentList);
rightArgument = (Node *) lsecond(argumentList);
strippedLeftArgument = strip_implicit_coercions(leftArgument);
strippedRightArgument = strip_implicit_coercions(rightArgument);
if (IsA(strippedLeftArgument, Var) && IsA(strippedRightArgument, Const))
{
candidatePartitionColumn = (Var *) strippedLeftArgument;
}
else if (IsA(strippedLeftArgument, Const) && IsA(strippedRightArgument, Var))
{
candidatePartitionColumn = (Var *) strippedRightArgument;
}
else
{
continue;
}
rangetableList = query->rtable;
rangeTableEntryIndex = candidatePartitionColumn->varno - 1;
rangeTableEntry = list_nth(rangetableList, rangeTableEntryIndex);
/*
* We currently don't support checking for equality when user refers
* to a column from the JOIN instead of the relation.
*/
if (rangeTableEntry->rtekind != RTE_RELATION)
{
continue;
}
relationId = rangeTableEntry->relid;
partitionColumn = DistPartitionKey(relationId);
if (partitionColumn != NULL &&
candidatePartitionColumn->varattno == partitionColumn->varattno)
{
partitionColumnOpExpressionList = lappend(partitionColumnOpExpressionList,
whereClause);
}
}
return partitionColumnOpExpressionList;
}
/*
* ReplaceColumnsInOpExpressionList walks over the given operator expression
* list and copies every one them, replaces columns with the given new column
* and finally returns new copies in a new list of operator expressions.
*/
List *
ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn)
{
List *newOpExpressionList = NIL;
ListCell *opExpressionCell = NULL;
foreach(opExpressionCell, opExpressionList)
{
OpExpr *opExpression = (OpExpr *) lfirst(opExpressionCell);
OpExpr *copyOpExpression = (OpExpr *) copyObject(opExpression);
List *argumentList = copyOpExpression->args;
List *newArgumentList = NIL;
Node *leftArgument = (Node *) linitial(argumentList);
Node *rightArgument = (Node *) lsecond(argumentList);
Node *strippedLeftArgument = strip_implicit_coercions(leftArgument);
Node *strippedRightArgument = strip_implicit_coercions(rightArgument);
if (IsA(strippedLeftArgument, Var))
{
newArgumentList = list_make2(newColumn, strippedRightArgument);
}
else if (IsA(strippedRightArgument, Var))
{
newArgumentList = list_make2(strippedLeftArgument, newColumn);
}
copyOpExpression->args = newArgumentList;
newOpExpressionList = lappend(newOpExpressionList, copyOpExpression);
}
return newOpExpressionList;
}
/*
* WorkerLimitCount checks if the given extended node contains a limit node, and
* if that node can be pushed down. For this, the function checks if this limit

View File

@ -98,9 +98,6 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn
static Relids QueryRteIdentities(Query *queryTree);
static DeferredErrorMessage * DeferErrorIfUnsupportedSublinkAndReferenceTable(
Query *queryTree);
static DeferredErrorMessage * DeferErrorIfUnsupportedFilters(Query *subquery);
static bool EqualOpExpressionLists(List *firstOpExpressionList,
List *secondOpExpressionList);
static DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree,
bool
outerMostQueryHasLimit);
@ -702,12 +699,6 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
{
return error;
}
error = DeferErrorIfUnsupportedFilters(subquery);
if (error)
{
return error;
}
}
return NULL;
@ -975,129 +966,6 @@ DeferErrorIfUnsupportedSublinkAndReferenceTable(Query *queryTree)
}
/*
* DeferErrorIfUnsupportedFilters checks if all leaf queries in the given query have
* same filter on the partition column. Note that if there are queries without
* any filter on the partition column, they don't break this prerequisite.
*/
static DeferredErrorMessage *
DeferErrorIfUnsupportedFilters(Query *subquery)
{
List *queryList = NIL;
ListCell *queryCell = NULL;
List *subqueryOpExpressionList = NIL;
List *relationIdList = RelationIdList(subquery);
Var *partitionColumn = NULL;
Oid relationId = InvalidOid;
/*
* If there are no appropriate relations, we're going to error out on
* DeferErrorIfCannotPushdownSubquery(). It may happen once the subquery
* does not include a relation.
*/
if (relationIdList == NIL)
{
return NULL;
}
/*
* Get relation id of any relation in the subquery and create partiton column
* for this relation. We will use this column to replace columns on operator
* expressions on different tables. Then we compare these operator expressions
* to see if they consist of same operator and constant value.
*/
relationId = linitial_oid(relationIdList);
partitionColumn = PartitionColumn(relationId, 0);
ExtractQueryWalker((Node *) subquery, &queryList);
foreach(queryCell, queryList)
{
Query *query = (Query *) lfirst(queryCell);
List *opExpressionList = NIL;
List *newOpExpressionList = NIL;
bool leafQuery = LeafQuery(query);
if (!leafQuery)
{
continue;
}
opExpressionList = PartitionColumnOpExpressionList(query);
if (opExpressionList == NIL)
{
continue;
}
newOpExpressionList = ReplaceColumnsInOpExpressionList(opExpressionList,
partitionColumn);
if (subqueryOpExpressionList == NIL)
{
subqueryOpExpressionList = newOpExpressionList;
}
else
{
bool equalOpExpressionLists = EqualOpExpressionLists(subqueryOpExpressionList,
newOpExpressionList);
if (!equalOpExpressionLists)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot push down this subquery",
"Currently all leaf queries need to "
"have same filters on partition column", NULL);
}
}
}
return NULL;
}
/*
* EqualOpExpressionLists checks if given two operator expression lists are
* equal.
*/
static bool
EqualOpExpressionLists(List *firstOpExpressionList, List *secondOpExpressionList)
{
bool equalOpExpressionLists = false;
ListCell *firstOpExpressionCell = NULL;
uint32 equalOpExpressionCount = 0;
uint32 firstOpExpressionCount = list_length(firstOpExpressionList);
uint32 secondOpExpressionCount = list_length(secondOpExpressionList);
if (firstOpExpressionCount != secondOpExpressionCount)
{
return false;
}
foreach(firstOpExpressionCell, firstOpExpressionList)
{
OpExpr *firstOpExpression = (OpExpr *) lfirst(firstOpExpressionCell);
ListCell *secondOpExpressionCell = NULL;
foreach(secondOpExpressionCell, secondOpExpressionList)
{
OpExpr *secondOpExpression = (OpExpr *) lfirst(secondOpExpressionCell);
bool equalExpressions = equal(firstOpExpression, secondOpExpression);
if (equalExpressions)
{
equalOpExpressionCount++;
continue;
}
}
}
if (equalOpExpressionCount == firstOpExpressionCount)
{
equalOpExpressionLists = true;
}
return equalOpExpressionLists;
}
/*
* DeferErrorIfCannotPushdownSubquery checks if we can push down the given
* subquery to worker nodes. If we cannot push down the subquery, this function

View File

@ -120,9 +120,6 @@ extern Oid FunctionOid(const char *schemaName, const char *functionName,
extern List * SubqueryMultiTableList(MultiNode *multiNode);
extern List * GroupTargetEntryList(List *groupClauseList, List *targetEntryList);
extern bool ExtractQueryWalker(Node *node, List **queryList);
extern bool LeafQuery(Query *queryTree);
extern List * PartitionColumnOpExpressionList(Query *query);
extern List * ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn);
extern bool IsPartitionColumn(Expr *columnExpression, Query *query);
extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList,
Query *query, Oid *relationId, Var **column);

View File

@ -777,9 +777,18 @@ HINT: Consider using an equality filter on the distributed table's partition co
SELECT * FROM (
(SELECT * FROM articles_hash_mx WHERE author_id = 1)
UNION
(SELECT * FROM articles_hash_mx WHERE author_id = 2)) uu;
ERROR: cannot push down this subquery
DETAIL: Currently all leaf queries need to have same filters on partition column
(SELECT * FROM articles_hash_mx WHERE author_id = 2)) uu
ORDER BY 1, 2
LIMIT 5;
id | author_id | title | word_count
----+-----------+------------+------------
1 | 1 | arsenous | 9572
2 | 2 | abducing | 13642
11 | 1 | alamo | 1347
12 | 2 | archiblast | 18185
21 | 1 | arcading | 5890
(5 rows)
-- error out for queries with repartition jobs
SELECT *
FROM articles_hash_mx a, articles_hash_mx b

View File

@ -881,7 +881,7 @@ DEBUG: Plan is router executable
az
(4 rows)
-- union queries are not supported if not router plannable
-- top-level union queries are not supported if not router plannable
-- there is an inconsistency on shard pruning between
-- ubuntu/mac disabling log messages for this queries only
SET client_min_messages to 'NOTICE';
@ -890,12 +890,22 @@ UNION
(SELECT * FROM articles_hash WHERE author_id = 2);
ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT
HINT: Consider using an equality filter on the distributed table's partition column.
-- unions in subqueries are supported with subquery pushdown
SELECT * FROM (
(SELECT * FROM articles_hash WHERE author_id = 1)
UNION
(SELECT * FROM articles_hash WHERE author_id = 2)) uu;
ERROR: cannot push down this subquery
DETAIL: Currently all leaf queries need to have same filters on partition column
(SELECT * FROM articles_hash WHERE author_id = 2)) uu
ORDER BY 1, 2
LIMIT 5;
id | author_id | title | word_count
----+-----------+------------+------------
1 | 1 | arsenous | 9572
2 | 2 | abducing | 13642
11 | 1 | alamo | 1347
12 | 2 | archiblast | 18185
21 | 1 | arcading | 5890
(5 rows)
-- error out for queries with repartition jobs
SELECT *
FROM articles_hash a, articles_hash b

View File

@ -267,6 +267,50 @@ SELECT max(l_orderkey) FROM
14947
(1 row)
-- Subqueries filter by 2 different users
SELECT *
FROM
(SELECT *
FROM
(SELECT user_id,
sum(value_2) AS counter
FROM events_table
WHERE user_id = 2
GROUP BY user_id) AS foo,
(SELECT user_id,
sum(value_2) AS counter
FROM events_table
WHERE user_id = 3
GROUP BY user_id) AS bar
WHERE foo.user_id = bar.user_id ) AS baz;
user_id | counter | user_id | counter
---------+---------+---------+---------
(0 rows)
-- Subqueries filter by different users, one of which overlaps
SELECT *
FROM
(SELECT *
FROM
(SELECT user_id,
sum(value_2) AS counter
FROM events_table
WHERE user_id = 2
OR user_id = 3
GROUP BY user_id) AS foo,
(SELECT user_id,
sum(value_2) AS counter
FROM events_table
WHERE user_id = 2
GROUP BY user_id) AS bar
WHERE foo.user_id = bar.user_id ) AS baz
ORDER BY 1,2
LIMIT 5;
user_id | counter | user_id | counter
---------+---------+---------+---------
2 | 57 | 2 | 57
(1 row)
-- Add one more shard to one relation, then test if we error out because of different
-- shard counts for joining relations.
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id

View File

@ -23,6 +23,21 @@ LIMIT 5;
2 | 4
(5 rows)
-- can use different filters on partition columns
SELECT *
FROM (
SELECT user_id, max(value_2) FROM users_table WHERE user_id = 1 GROUP BY user_id
UNION ALL
SELECT user_id, max(value_2) FROM users_table WHERE user_id = 5 GROUP BY user_id
) user_id
ORDER BY 2 DESC,1
LIMIT 5;
user_id | max
---------+-----
5 | 5
1 | 4
(2 rows)
-- a very simple union query with reference table
SELECT user_id, counter
FROM (

View File

@ -367,7 +367,9 @@ UNION
SELECT * FROM (
(SELECT * FROM articles_hash_mx WHERE author_id = 1)
UNION
(SELECT * FROM articles_hash_mx WHERE author_id = 2)) uu;
(SELECT * FROM articles_hash_mx WHERE author_id = 2)) uu
ORDER BY 1, 2
LIMIT 5;
-- error out for queries with repartition jobs
SELECT *

View File

@ -422,7 +422,7 @@ SELECT * FROM (
) AS combination
ORDER BY 1;
-- union queries are not supported if not router plannable
-- top-level union queries are not supported if not router plannable
-- there is an inconsistency on shard pruning between
-- ubuntu/mac disabling log messages for this queries only
@ -432,11 +432,13 @@ SET client_min_messages to 'NOTICE';
UNION
(SELECT * FROM articles_hash WHERE author_id = 2);
-- unions in subqueries are supported with subquery pushdown
SELECT * FROM (
(SELECT * FROM articles_hash WHERE author_id = 1)
UNION
(SELECT * FROM articles_hash WHERE author_id = 2)) uu;
(SELECT * FROM articles_hash WHERE author_id = 2)) uu
ORDER BY 1, 2
LIMIT 5;
-- error out for queries with repartition jobs
SELECT *

View File

@ -233,6 +233,44 @@ SELECT max(l_orderkey) FROM
) z
) y;
-- Subqueries filter by 2 different users
SELECT *
FROM
(SELECT *
FROM
(SELECT user_id,
sum(value_2) AS counter
FROM events_table
WHERE user_id = 2
GROUP BY user_id) AS foo,
(SELECT user_id,
sum(value_2) AS counter
FROM events_table
WHERE user_id = 3
GROUP BY user_id) AS bar
WHERE foo.user_id = bar.user_id ) AS baz;
-- Subqueries filter by different users, one of which overlaps
SELECT *
FROM
(SELECT *
FROM
(SELECT user_id,
sum(value_2) AS counter
FROM events_table
WHERE user_id = 2
OR user_id = 3
GROUP BY user_id) AS foo,
(SELECT user_id,
sum(value_2) AS counter
FROM events_table
WHERE user_id = 2
GROUP BY user_id) AS bar
WHERE foo.user_id = bar.user_id ) AS baz
ORDER BY 1,2
LIMIT 5;
-- Add one more shard to one relation, then test if we error out because of different
-- shard counts for joining relations.

View File

@ -17,6 +17,16 @@ FROM (
ORDER BY 2 DESC,1
LIMIT 5;
-- can use different filters on partition columns
SELECT *
FROM (
SELECT user_id, max(value_2) FROM users_table WHERE user_id = 1 GROUP BY user_id
UNION ALL
SELECT user_id, max(value_2) FROM users_table WHERE user_id = 5 GROUP BY user_id
) user_id
ORDER BY 2 DESC,1
LIMIT 5;
-- a very simple union query with reference table
SELECT user_id, counter
FROM (