mirror of https://github.com/citusdata/citus.git
Duplicate grouping on worker whenever possible
This is possible whenever we aren't pulling up intermediate rows We want to do this because this was done in 9.2, some queries rely on the performance of grouping causing distinct values This change was introduced when implementing window functions on coordinatorpull/3722/head
parent
6a6d5af8a3
commit
4860e11561
|
@ -328,9 +328,6 @@ static bool HasOrderByAggregate(List *sortClauseList, List *targetList);
|
||||||
static bool HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList);
|
static bool HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList);
|
||||||
static bool HasOrderByComplexExpression(List *sortClauseList, List *targetList);
|
static bool HasOrderByComplexExpression(List *sortClauseList, List *targetList);
|
||||||
static bool HasOrderByHllType(List *sortClauseList, List *targetList);
|
static bool HasOrderByHllType(List *sortClauseList, List *targetList);
|
||||||
static bool ShouldPushDownGroupingToWorker(MultiExtendedOp *opNode,
|
|
||||||
ExtendedOpNodeProperties *
|
|
||||||
extendedOpNodeProperties);
|
|
||||||
static bool ShouldProcessDistinctOrderAndLimitForWorker(
|
static bool ShouldProcessDistinctOrderAndLimitForWorker(
|
||||||
ExtendedOpNodeProperties *extendedOpNodeProperties,
|
ExtendedOpNodeProperties *extendedOpNodeProperties,
|
||||||
bool pushingDownOriginalGrouping,
|
bool pushingDownOriginalGrouping,
|
||||||
|
@ -2223,7 +2220,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||||
/* targetProjectionNumber starts from 1 */
|
/* targetProjectionNumber starts from 1 */
|
||||||
queryTargetList.targetProjectionNumber = 1;
|
queryTargetList.targetProjectionNumber = 1;
|
||||||
|
|
||||||
if (ShouldPushDownGroupingToWorker(originalOpNode, extendedOpNodeProperties))
|
if (!extendedOpNodeProperties->pullUpIntermediateRows)
|
||||||
{
|
{
|
||||||
queryGroupClause.groupClauseList = copyObject(originalGroupClauseList);
|
queryGroupClause.groupClauseList = copyObject(originalGroupClauseList);
|
||||||
}
|
}
|
||||||
|
@ -4717,46 +4714,6 @@ HasOrderByHllType(List *sortClauseList, List *targetList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ShouldPushDownGroupingToWorker returns whether we push down GROUP BY.
|
|
||||||
* This may return true even when GROUP BY is necessary on master.
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
ShouldPushDownGroupingToWorker(MultiExtendedOp *opNode,
|
|
||||||
ExtendedOpNodeProperties *extendedOpNodeProperties)
|
|
||||||
{
|
|
||||||
if (extendedOpNodeProperties->pushDownGroupingAndHaving)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (extendedOpNodeProperties->pullUpIntermediateRows)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Duplicate grouping if we have LIMIT without HAVING, as this can
|
|
||||||
* often result in LIMIT being pushed down.
|
|
||||||
*/
|
|
||||||
if (opNode->havingQual == NULL && opNode->limitCount != NULL)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If aggregates are being split across worker & master, so must grouping.
|
|
||||||
*/
|
|
||||||
if (contain_aggs_of_level(opNode->havingQual, 0) ||
|
|
||||||
contain_aggs_of_level((Node *) opNode->targetList, 0))
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ShouldProcessDistinctOrderAndLimitForWorker returns whether
|
* ShouldProcessDistinctOrderAndLimitForWorker returns whether
|
||||||
* ProcessDistinctClauseForWorkerQuery should be called. If not,
|
* ProcessDistinctClauseForWorkerQuery should be called. If not,
|
||||||
|
|
|
@ -453,8 +453,11 @@ HashAggregate
|
||||||
Tasks Shown: One of 2
|
Tasks Shown: One of 2
|
||||||
-> Task
|
-> Task
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
-> Seq Scan on public.lineitem_290000 lineitem
|
-> HashAggregate
|
||||||
Output: l_quantity, l_quantity
|
Output: l_quantity, l_quantity
|
||||||
|
Group Key: lineitem.l_quantity
|
||||||
|
-> Seq Scan on public.lineitem_290000 lineitem
|
||||||
|
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
|
||||||
-- Subquery pushdown tests with explain
|
-- Subquery pushdown tests with explain
|
||||||
EXPLAIN (COSTS OFF)
|
EXPLAIN (COSTS OFF)
|
||||||
SELECT
|
SELECT
|
||||||
|
|
|
@ -747,7 +747,7 @@ RESET citus.subquery_pushdown;
|
||||||
VACUUM ANALYZE users_table;
|
VACUUM ANALYZE users_table;
|
||||||
-- explain tests
|
-- explain tests
|
||||||
EXPLAIN (COSTS FALSE) SELECT user_id FROM recent_selected_users GROUP BY 1 ORDER BY 1;
|
EXPLAIN (COSTS FALSE) SELECT user_id FROM recent_selected_users GROUP BY 1 ORDER BY 1;
|
||||||
QUERY PLAN
|
QUERY PLAN
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
Sort
|
Sort
|
||||||
Sort Key: remote_scan.user_id
|
Sort Key: remote_scan.user_id
|
||||||
|
@ -758,17 +758,19 @@ EXPLAIN (COSTS FALSE) SELECT user_id FROM recent_selected_users GROUP BY 1 ORDER
|
||||||
Tasks Shown: One of 4
|
Tasks Shown: One of 4
|
||||||
-> Task
|
-> Task
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
-> Nested Loop
|
-> HashAggregate
|
||||||
Join Filter: (users_table.user_id = users_table_1.user_id)
|
Group Key: users_table.user_id
|
||||||
-> Sort
|
-> Nested Loop
|
||||||
Sort Key: (max(users_table_1."time")) DESC
|
Join Filter: (users_table.user_id = users_table_1.user_id)
|
||||||
-> HashAggregate
|
-> Sort
|
||||||
Group Key: users_table_1.user_id
|
Sort Key: (max(users_table_1."time")) DESC
|
||||||
Filter: (max(users_table_1."time") > '2017-11-23 16:20:33.264457'::timestamp without time zone)
|
-> HashAggregate
|
||||||
-> Seq Scan on users_table_1400256 users_table_1
|
Group Key: users_table_1.user_id
|
||||||
-> Seq Scan on users_table_1400256 users_table
|
Filter: (max(users_table_1."time") > '2017-11-23 16:20:33.264457'::timestamp without time zone)
|
||||||
Filter: ((value_1 >= 1) AND (value_1 < 3))
|
-> Seq Scan on users_table_1400256 users_table_1
|
||||||
(19 rows)
|
-> Seq Scan on users_table_1400256 users_table
|
||||||
|
Filter: ((value_1 >= 1) AND (value_1 < 3))
|
||||||
|
(21 rows)
|
||||||
|
|
||||||
EXPLAIN (COSTS FALSE) SELECT *
|
EXPLAIN (COSTS FALSE) SELECT *
|
||||||
FROM (
|
FROM (
|
||||||
|
|
Loading…
Reference in New Issue