Add HAVING support

This commit completes having support in Citus by adding having support for
real-time and task-tracker executors. Multiple tests are added to regression
tests to cover new supported queries with having support.
pull/811/head
Metin Doslu 2016-09-27 15:43:09 +03:00
parent 736c73d008
commit d03a2af778
12 changed files with 336 additions and 42 deletions

View File

@ -1245,6 +1245,8 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode)
List *targetEntryList = originalOpNode->targetList;
List *newTargetEntryList = NIL;
ListCell *targetEntryCell = NULL;
Node *originalHavingQual = originalOpNode->havingQual;
Node *newHavingQual = NULL;
MultiNode *parentNode = ParentNode((MultiNode *) originalOpNode);
MultiNode *childNode = ChildNode((MultiUnaryNode *) originalOpNode);
MasterAggregateWalkerContext *walkerContext = palloc0(
@ -1293,12 +1295,18 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode)
newTargetEntryList = lappend(newTargetEntryList, newTargetEntry);
}
if (originalHavingQual != NULL)
{
newHavingQual = MasterAggregateMutator(originalHavingQual, walkerContext);
}
masterExtendedOpNode = CitusMakeNode(MultiExtendedOp);
masterExtendedOpNode->targetList = newTargetEntryList;
masterExtendedOpNode->groupClauseList = originalOpNode->groupClauseList;
masterExtendedOpNode->sortClauseList = originalOpNode->sortClauseList;
masterExtendedOpNode->limitCount = originalOpNode->limitCount;
masterExtendedOpNode->limitOffset = originalOpNode->limitOffset;
masterExtendedOpNode->havingQual = newHavingQual;
return masterExtendedOpNode;
}
@ -1748,6 +1756,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode)
ListCell *targetEntryCell = NULL;
List *newTargetEntryList = NIL;
List *groupClauseList = copyObject(originalOpNode->groupClauseList);
Node *havingQual = originalOpNode->havingQual;
AttrNumber targetProjectionNumber = 1;
WorkerAggregateWalkerContext *walkerContext =
palloc0(sizeof(WorkerAggregateWalkerContext));
@ -1851,6 +1860,41 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode)
}
}
/* we also need to add having expressions to worker target list */
if (havingQual != NULL)
{
List *newExpressionList = NIL;
ListCell *newExpressionCell = NULL;
/* reset walker context */
walkerContext->expressionList = NIL;
walkerContext->createGroupByClause = false;
WorkerAggregateWalker(havingQual, walkerContext);
newExpressionList = walkerContext->expressionList;
/* now create target entries for each new expression */
foreach(newExpressionCell, newExpressionList)
{
TargetEntry *newTargetEntry = makeNode(TargetEntry);
StringInfo columnNameString = makeStringInfo();
Expr *newExpression = (Expr *) lfirst(newExpressionCell);
newTargetEntry->expr = newExpression;
appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT,
targetProjectionNumber);
newTargetEntry->resname = columnNameString->data;
/* force resjunk to false as we may need this on the master */
newTargetEntry->resjunk = false;
newTargetEntry->resno = targetProjectionNumber;
newTargetEntryList = lappend(newTargetEntryList, newTargetEntry);
targetProjectionNumber++;
}
}
workerExtendedOpNode = CitusMakeNode(MultiExtendedOp);
workerExtendedOpNode->targetList = newTargetEntryList;
workerExtendedOpNode->groupClauseList = groupClauseList;

View File

@ -375,12 +375,6 @@ ErrorIfQueryNotSupported(Query *queryTree)
errorDetail = "Subqueries other than in from-clause are currently unsupported";
}
if (queryTree->havingQual != NULL)
{
preconditionsSatisfied = false;
errorDetail = "Having qual is currently unsupported";
}
if (queryTree->hasWindowFuncs)
{
preconditionsSatisfied = false;
@ -1264,7 +1258,6 @@ MultiProjectNode(List *targetEntryList)
/* extract the list of columns and remove any duplicates */
columnList = pull_var_clause_default((Node *) targetEntryList);
foreach(columnCell, columnList)
{
Var *column = (Var *) lfirst(columnCell);
@ -1290,6 +1283,7 @@ MultiExtendedOpNode(Query *queryTree)
extendedOpNode->sortClauseList = queryTree->sortClause;
extendedOpNode->limitCount = queryTree->limitCount;
extendedOpNode->limitOffset = queryTree->limitOffset;
extendedOpNode->havingQual = queryTree->havingQual;
return extendedOpNode;
}

View File

@ -124,8 +124,11 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
AttrNumber *groupColumnIdArray = NULL;
List *aggregateTargetList = NIL;
List *groupColumnList = NIL;
List *aggregateColumnList = NIL;
List *havingColumnList = NIL;
List *columnList = NIL;
ListCell *columnCell = NULL;
Node *havingQual = NULL;
Oid *groupColumnOpArray = NULL;
uint32 groupColumnCount = 0;
const long rowEstimate = 10;
@ -134,14 +137,21 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
Assert(masterQuery->hasAggs || masterQuery->groupClause);
aggregateTargetList = masterQuery->targetList;
memset(&aggregateCosts, 0, sizeof(AggClauseCosts));
havingQual = masterQuery->havingQual;
/* estimate aggregate execution costs */
MemSet(&aggregateCosts, 0, sizeof(AggClauseCosts));
count_agg_clauses(NULL, (Node *) aggregateTargetList, &aggregateCosts);
count_agg_clauses(NULL, havingQual, &aggregateCosts);
/*
* For upper level plans above the sequential scan, the planner expects the
* table id (varno) to be set to OUTER_VAR.
*/
columnList = pull_var_clause_default((Node *) aggregateTargetList);
aggregateColumnList = pull_var_clause_default((Node *) aggregateTargetList);
havingColumnList = pull_var_clause_default(havingQual);
columnList = list_concat(aggregateColumnList, havingColumnList);
foreach(columnCell, columnList)
{
Var *column = (Var *) lfirst(columnCell);
@ -168,9 +178,10 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
}
/* finally create the plan */
aggregatePlan = make_agg(NULL, aggregateTargetList, NIL, aggregateStrategy,
&aggregateCosts, groupColumnCount, groupColumnIdArray,
groupColumnOpArray, NIL, rowEstimate, subPlan);
aggregatePlan = make_agg(NULL, aggregateTargetList, (List *) havingQual,
aggregateStrategy, &aggregateCosts, groupColumnCount,
groupColumnIdArray, groupColumnOpArray, NIL,
rowEstimate, subPlan);
return aggregatePlan;
}

View File

@ -513,6 +513,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
ListCell *columnCell = NULL;
FromExpr *joinTree = NULL;
Node *joinRoot = NULL;
Node *havingQual = NULL;
/* we start building jobs from below the collect node */
Assert(!CitusIsA(multiNode, MultiCollect));
@ -590,6 +591,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
limitCount = extendedOp->limitCount;
limitOffset = extendedOp->limitOffset;
sortClauseList = extendedOp->sortClauseList;
havingQual = extendedOp->havingQual;
}
/* build group clauses */
@ -631,6 +633,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
jobQuery->groupClause = groupClauseList;
jobQuery->limitOffset = limitOffset;
jobQuery->limitCount = limitCount;
jobQuery->havingQual = havingQual;
jobQuery->hasAggs = contain_agg_clause((Node *) targetList);
return jobQuery;
@ -721,6 +724,7 @@ BuildReduceQuery(MultiExtendedOp *extendedOpNode, List *dependedJobList)
reduceQuery->groupClause = extendedOpNode->groupClauseList;
reduceQuery->limitOffset = extendedOpNode->limitOffset;
reduceQuery->limitCount = extendedOpNode->limitCount;
reduceQuery->havingQual = extendedOpNode->havingQual;
reduceQuery->hasAggs = contain_agg_clause((Node *) targetList);
return reduceQuery;

View File

@ -346,6 +346,7 @@ _outMultiExtendedOp(StringInfo str, const MultiExtendedOp *node)
WRITE_NODE_FIELD(sortClauseList);
WRITE_NODE_FIELD(limitCount);
WRITE_NODE_FIELD(limitOffset);
WRITE_NODE_FIELD(havingQual);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
}

View File

@ -171,6 +171,7 @@ typedef struct MultiExtendedOp
List *sortClauseList;
Node *limitCount;
Node *limitOffset;
Node *havingQual;
} MultiExtendedOp;

View File

@ -364,11 +364,54 @@ Distributed Query into pg_merge_job_570012
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Seq Scan on pg_merge_job_570012
-- Test having
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem
HAVING sum(l_quantity) > 100;
Distributed Query into pg_merge_job_570013
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
Output: sum(l_quantity), sum(l_quantity), count(l_quantity), sum(l_quantity)
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
Output: (sum(intermediate_column_570013_0) / (sum(intermediate_column_570013_1) / sum(intermediate_column_570013_2)))
Filter: (sum(pg_merge_job_570013.intermediate_column_570013_3) > '100'::numeric)
-> Seq Scan on pg_temp_2.pg_merge_job_570013
Output: intermediate_column_570013_0, intermediate_column_570013_1, intermediate_column_570013_2, intermediate_column_570013_3
-- Test having without aggregate
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT l_quantity FROM lineitem
GROUP BY l_quantity
HAVING l_quantity > (100 * random());
Distributed Query into pg_merge_job_570014
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Output: l_quantity, l_quantity
Group Key: lineitem.l_quantity
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> HashAggregate
Output: intermediate_column_570014_0
Group Key: pg_merge_job_570014.intermediate_column_570014_0
Filter: ((pg_merge_job_570014.intermediate_column_570014_1)::double precision > ('100'::double precision * random()))
-> Seq Scan on pg_temp_2.pg_merge_job_570014
Output: intermediate_column_570014_0, intermediate_column_570014_1
-- Test all tasks output
SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Distributed Query into pg_merge_job_570013
Distributed Query into pg_merge_job_570015
Executor: Real-Time
Task Count: 4
Tasks Shown: All
@ -394,7 +437,7 @@ Distributed Query into pg_merge_job_570013
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570013
-> Seq Scan on pg_merge_job_570015
SELECT true AS valid FROM explain_xml($$
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
t
@ -406,7 +449,7 @@ SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Distributed Query into pg_merge_job_570016
Distributed Query into pg_merge_job_570018
Executor: Task-Tracker
Task Count: 4
Tasks Shown: One of 4
@ -417,7 +460,7 @@ Distributed Query into pg_merge_job_570016
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570016
-> Seq Scan on pg_merge_job_570018
-- Test re-partition join
SET citus.large_table_shard_count TO 1;
EXPLAIN (COSTS FALSE)
@ -426,7 +469,7 @@ EXPLAIN (COSTS FALSE)
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
Distributed Query into pg_merge_job_570019
Distributed Query into pg_merge_job_570021
Executor: Task-Tracker
Task Count: 1
Tasks Shown: None, not supported for re-partition queries
@ -438,7 +481,7 @@ Distributed Query into pg_merge_job_570019
Merge Task Count: 1
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570019
-> Seq Scan on pg_merge_job_570021
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT count(*)
FROM lineitem, orders, customer, supplier
@ -473,8 +516,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "pg_merge_job_570022",
"Alias": "pg_merge_job_570022"
"Relation Name": "pg_merge_job_570024",
"Alias": "pg_merge_job_570024"
}
]
}
@ -523,8 +566,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>pg_merge_job_570028</Relation-Name>
<Alias>pg_merge_job_570028</Alias>
<Relation-Name>pg_merge_job_570030</Relation-Name>
<Alias>pg_merge_job_570030</Alias>
</Plan>
</Plans>
</Plan>
@ -562,5 +605,5 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "pg_merge_job_570034"
Alias: "pg_merge_job_570034"
Relation Name: "pg_merge_job_570036"
Alias: "pg_merge_job_570036"

View File

@ -156,17 +156,19 @@ SELECT title, author_id FROM articles
alkylic | 8
(10 rows)
-- add in some grouping expressions.
-- it is supported if it is on the same shard, but not supported if it
-- involves multiple shards.
-- having queries unsupported in Citus
-- add in some grouping expressions
SELECT author_id, sum(word_count) AS corpus_size FROM articles
WHERE author_id = 1 OR author_id = 2 OR author_id = 8 OR author_id = 10
GROUP BY author_id
HAVING sum(word_count) > 40000
ORDER BY sum(word_count) DESC;
ERROR: cannot perform distributed planning on this query
DETAIL: Having qual is currently unsupported
author_id | corpus_size
-----------+-------------
2 | 61782
10 | 59955
8 | 55410
(3 rows)
-- UNION/INTERSECT queries are unsupported if on multiple shards
SELECT * FROM articles WHERE author_id = 10 UNION
SELECT * FROM articles WHERE author_id = 2;
@ -288,21 +290,91 @@ SELECT FROM articles WHERE word_count = 65500;
--
(0 rows)
-- having queries unsupported in Citus
-- having queries supported in Citus
SELECT author_id, sum(word_count) AS corpus_size FROM articles
GROUP BY author_id
HAVING sum(word_count) > 25000
ORDER BY sum(word_count) DESC
LIMIT 5;
ERROR: cannot perform distributed planning on this query
DETAIL: Having qual is currently unsupported
-- more proof Citus doesn't support having clauses
author_id | corpus_size
-----------+-------------
4 | 66325
2 | 61782
10 | 59955
8 | 55410
6 | 50867
(5 rows)
SELECT author_id FROM articles
GROUP BY author_id
HAVING sum(word_count) > 50000
ORDER BY author_id;
ERROR: cannot perform distributed planning on this query
DETAIL: Having qual is currently unsupported
author_id
-----------
2
4
6
8
10
(5 rows)
SELECT author_id FROM articles
GROUP BY author_id
HAVING sum(word_count) > 50000 AND author_id < 5
ORDER BY author_id;
author_id
-----------
2
4
(2 rows)
SELECT author_id FROM articles
GROUP BY author_id
HAVING sum(word_count) > 50000 OR author_id < 5
ORDER BY author_id;
author_id
-----------
1
2
3
4
6
8
10
(7 rows)
SELECT author_id FROM articles
GROUP BY author_id
HAVING author_id <= 2 OR author_id = 8
ORDER BY author_id;
author_id
-----------
1
2
8
(3 rows)
SELECT o_orderstatus, count(*), avg(o_totalprice) FROM orders
GROUP BY o_orderstatus
HAVING count(*) > 1450 OR avg(o_totalprice) > 150000
ORDER BY o_orderstatus;
o_orderstatus | count | avg
---------------+-------+---------------------
O | 1460 | 143355.847013698630
P | 75 | 164847.914533333333
(2 rows)
SELECT o_orderstatus, sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
WHERE l_orderkey = o_orderkey AND l_orderkey > 9030
GROUP BY o_orderstatus
HAVING sum(l_linenumber) > 1000
ORDER BY o_orderstatus;
o_orderstatus | sum | avg
---------------+------+--------------------
F | 8559 | 3.0126715945089757
O | 8901 | 3.0050641458474004
(2 rows)
-- now, test the cases where Citus do or do not need to create
-- the master queries
SET citus.large_table_shard_count TO 2;

View File

@ -42,6 +42,44 @@ limit 10;
19 | 2 | 4.0000000000000000
(10 rows)
-- same query above, just replace outer where clause with inner having clause
select
number_sum,
count(*) as total,
avg(total_count) avg_count
from
(select
l_suppkey,
l_linestatus,
sum(l_linenumber) as number_sum,
count(*) as total_count
from
lineitem
group by
l_suppkey,
l_linestatus
having
sum(l_linenumber) >= 10) as distributed_table
group by
number_sum
order by
total desc,
number_sum desc
limit 10;
number_sum | total | avg_count
------------+-------+--------------------
10 | 136 | 2.3970588235294118
11 | 97 | 2.6082474226804124
12 | 56 | 2.8392857142857143
13 | 42 | 2.8809523809523810
14 | 21 | 3.2857142857142857
16 | 10 | 3.5000000000000000
15 | 10 | 3.3000000000000000
17 | 6 | 3.3333333333333333
18 | 3 | 4.0000000000000000
19 | 2 | 4.0000000000000000
(10 rows)
select
(l_suppkey / 100) as suppkey_bin,
avg(total_count) avg_count
@ -188,6 +226,22 @@ from
1.02907126318497555956
(1 row)
select
avg(different_shipment_days)
from
(select
count(distinct l_shipdate) as different_shipment_days
from
lineitem
group by
l_partkey
having
count(distinct l_shipdate) >= 2) as distributed_table;
avg
--------------------
2.0335365853658537
(1 row)
-- Check that if subquery is pulled, we don't error and run query properly.
SELECT max(l_suppkey) FROM
(

View File

@ -111,6 +111,17 @@ EXPLAIN (COSTS FALSE)
CREATE TABLE explain_result AS
SELECT * FROM lineitem;
-- Test having
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem
HAVING sum(l_quantity) > 100;
-- Test having without aggregate
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT l_quantity FROM lineitem
GROUP BY l_quantity
HAVING l_quantity > (100 * random());
-- Test all tasks output
SET citus.explain_all_tasks TO on;

View File

@ -109,10 +109,7 @@ SELECT title, author_id FROM articles
WHERE author_id = 7 OR author_id = 8
ORDER BY author_id ASC, id;
-- add in some grouping expressions.
-- it is supported if it is on the same shard, but not supported if it
-- involves multiple shards.
-- having queries unsupported in Citus
-- add in some grouping expressions
SELECT author_id, sum(word_count) AS corpus_size FROM articles
WHERE author_id = 1 OR author_id = 2 OR author_id = 8 OR author_id = 10
GROUP BY author_id
@ -173,19 +170,44 @@ SELECT FROM articles WHERE author_id = 3737;
SELECT FROM articles WHERE word_count = 65500;
-- having queries unsupported in Citus
-- having queries supported in Citus
SELECT author_id, sum(word_count) AS corpus_size FROM articles
GROUP BY author_id
HAVING sum(word_count) > 25000
ORDER BY sum(word_count) DESC
LIMIT 5;
-- more proof Citus doesn't support having clauses
SELECT author_id FROM articles
GROUP BY author_id
HAVING sum(word_count) > 50000
ORDER BY author_id;
SELECT author_id FROM articles
GROUP BY author_id
HAVING sum(word_count) > 50000 AND author_id < 5
ORDER BY author_id;
SELECT author_id FROM articles
GROUP BY author_id
HAVING sum(word_count) > 50000 OR author_id < 5
ORDER BY author_id;
SELECT author_id FROM articles
GROUP BY author_id
HAVING author_id <= 2 OR author_id = 8
ORDER BY author_id;
SELECT o_orderstatus, count(*), avg(o_totalprice) FROM orders
GROUP BY o_orderstatus
HAVING count(*) > 1450 OR avg(o_totalprice) > 150000
ORDER BY o_orderstatus;
SELECT o_orderstatus, sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
WHERE l_orderkey = o_orderkey AND l_orderkey > 9030
GROUP BY o_orderstatus
HAVING sum(l_linenumber) > 1000
ORDER BY o_orderstatus;
-- now, test the cases where Citus do or do not need to create
-- the master queries
SET citus.large_table_shard_count TO 2;

View File

@ -34,6 +34,31 @@ order by
number_sum desc
limit 10;
-- same query above, just replace outer where clause with inner having clause
select
number_sum,
count(*) as total,
avg(total_count) avg_count
from
(select
l_suppkey,
l_linestatus,
sum(l_linenumber) as number_sum,
count(*) as total_count
from
lineitem
group by
l_suppkey,
l_linestatus
having
sum(l_linenumber) >= 10) as distributed_table
group by
number_sum
order by
total desc,
number_sum desc
limit 10;
select
(l_suppkey / 100) as suppkey_bin,
avg(total_count) avg_count
@ -141,6 +166,18 @@ from
group by
l_partkey) as distributed_table;
select
avg(different_shipment_days)
from
(select
count(distinct l_shipdate) as different_shipment_days
from
lineitem
group by
l_partkey
having
count(distinct l_shipdate) >= 2) as distributed_table;
-- Check that if subquery is pulled, we don't error and run query properly.
SELECT max(l_suppkey) FROM