Fix aggregate issues

The aggtranstype field is somewhat tricky to set, but I discovered that
get_agg_clause_costs would set it correctly for me.

Also wraps all array_agg calls in a array_sort function at test time in
order to avoid random ordering issues.
pull/858/head
Jason Petersen 2016-10-05 09:58:15 -06:00
parent fa4421099f
commit 8a626c0731
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
4 changed files with 126 additions and 57 deletions

View File

@ -39,6 +39,7 @@
#include "optimizer/clauses.h"
#include "optimizer/tlist.h"
#include "optimizer/var.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "parser/parse_oper.h"
#include "utils/builtins.h"
@ -1271,8 +1272,17 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode)
bool hasAggregates = contain_agg_clause((Node *) originalExpression);
if (hasAggregates)
{
AggClauseCosts aggregateCosts;
Node *newNode = MasterAggregateMutator((Node *) originalExpression,
walkerContext);
#if PG_VERSION_NUM >= 90600
get_agg_clause_costs(NULL, (Node *) newNode, AGGSPLIT_SIMPLE,
&aggregateCosts);
#else
count_agg_clauses(NULL, (Node *) newNode, &aggregateCosts);
#endif
newExpression = (Expr *) newNode;
}
else
@ -1297,7 +1307,17 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode)
if (originalHavingQual != NULL)
{
AggClauseCosts aggregateCosts;
newHavingQual = MasterAggregateMutator(originalHavingQual, walkerContext);
memset(&aggregateCosts, 0, sizeof(aggregateCosts));
#if PG_VERSION_NUM >= 90600
get_agg_clause_costs(NULL, (Node *) newHavingQual, AGGSPLIT_SIMPLE,
&aggregateCosts);
#else
count_agg_clauses(NULL, (Node *) newHavingQual, &aggregateCosts);
#endif
}
masterExtendedOpNode = CitusMakeNode(MultiExtendedOp);
@ -1468,6 +1488,8 @@ MasterAggregateExpression(Aggref *originalAggregate,
unionAggregate->args = list_make1(hllTargetEntry);
unionAggregate->aggkind = AGGKIND_NORMAL;
/* TODO: Fix this for 9.6 */
cardinalityExpression = makeNode(FuncExpr);
cardinalityExpression->funcid = cardinalityFunctionId;
cardinalityExpression->funcresulttype = cardinalityReturnType;
@ -1527,8 +1549,8 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterAggregate->aggfnoid = sumFunctionId;
newMasterAggregate->aggtype = masterReturnType;
#if (PG_VERSION_NUM >= 90600)
newMasterAggregate->aggtranstype = INTERNALOID;
newMasterAggregate->aggargtypes = list_make1_oid(NUMERICOID);
newMasterAggregate->aggtranstype = InvalidOid;
newMasterAggregate->aggargtypes = list_make1_oid(newMasterAggregate->aggtype);
newMasterAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
@ -1595,6 +1617,11 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterAggregate = copyObject(originalAggregate);
newMasterAggregate->aggfnoid = aggregateFunctionId;
newMasterAggregate->args = list_make1(arrayCatAggArgument);
#if (PG_VERSION_NUM >= 90600)
newMasterAggregate->aggtranstype = InvalidOid;
newMasterAggregate->aggargtypes = list_make1_oid(ANYARRAYOID);
newMasterAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
newMasterExpression = (Expr *) newMasterAggregate;
}
@ -1688,8 +1715,8 @@ MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType,
firstSum->args = list_make1(firstTargetEntry);
firstSum->aggkind = AGGKIND_NORMAL;
#if (PG_VERSION_NUM >= 90600)
firstSum->aggtranstype = INTERNALOID;
firstSum->aggargtypes = list_make1_oid(NUMERICOID);
firstSum->aggtranstype = InvalidOid;
firstSum->aggargtypes = list_make1_oid(firstSum->aggtype);
firstSum->aggsplit = AGGSPLIT_SIMPLE;
#endif
@ -1705,8 +1732,8 @@ MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType,
secondSum->args = list_make1(secondTargetEntry);
secondSum->aggkind = AGGKIND_NORMAL;
#if (PG_VERSION_NUM >= 90600)
secondSum->aggtranstype = INTERNALOID;
secondSum->aggargtypes = list_make1_oid(NUMERICOID);
secondSum->aggtranstype = InvalidOid;
secondSum->aggargtypes = list_make1_oid(firstSum->aggtype);
secondSum->aggsplit = AGGSPLIT_SIMPLE;
#endif
@ -1812,7 +1839,16 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode)
if (hasAggregates)
{
AggClauseCosts aggregateCosts;
WorkerAggregateWalker((Node *) originalExpression, walkerContext);
#if PG_VERSION_NUM >= 90600
get_agg_clause_costs(NULL, (Node *) walkerContext->expressionList,
AGGSPLIT_SIMPLE, &aggregateCosts);
#else
count_agg_clauses(NULL, (Node *) walkerContext->expressionList,
&aggregateCosts);
#endif
newExpressionList = walkerContext->expressionList;
}
else
@ -1880,6 +1916,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode)
{
List *newExpressionList = NIL;
ListCell *newExpressionCell = NULL;
AggClauseCosts aggregateCosts;
/* reset walker context */
walkerContext->expressionList = NIL;
@ -1887,6 +1924,15 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode)
WorkerAggregateWalker(havingQual, walkerContext);
newExpressionList = walkerContext->expressionList;
memset(&aggregateCosts, 0, sizeof(aggregateCosts));
#if PG_VERSION_NUM >= 90600
get_agg_clause_costs(NULL, (Node *) walkerContext->expressionList,
AGGSPLIT_SIMPLE, &aggregateCosts);
#else
count_agg_clauses(NULL, (Node *) walkerContext->expressionList,
&aggregateCosts);
#endif
/* now create target entries for each new expression */
foreach(newExpressionCell, newExpressionList)
@ -2075,9 +2121,20 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
sumAggregate->aggfnoid = AggregateFunctionOid(sumAggregateName, argumentType);
sumAggregate->aggtype = get_func_rettype(sumAggregate->aggfnoid);
#if (PG_VERSION_NUM >= 90600)
sumAggregate->aggtranstype = InvalidOid;
sumAggregate->aggargtypes = list_make1_oid(argumentType);
sumAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
/* count has any input type */
countAggregate->aggfnoid = AggregateFunctionOid(countAggregateName, ANYOID);
countAggregate->aggtype = get_func_rettype(countAggregate->aggfnoid);
#if (PG_VERSION_NUM >= 90600)
countAggregate->aggtranstype = InvalidOid;
countAggregate->aggargtypes = list_make1_oid(argumentType);
countAggregate->aggsplit = AGGSPLIT_SIMPLE;
#endif
workerAggregateList = lappend(workerAggregateList, sumAggregate);
workerAggregateList = lappend(workerAggregateList, countAggregate);

View File

@ -144,6 +144,7 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
#if (PG_VERSION_NUM >= 90600)
get_agg_clause_costs(NULL, (Node *) aggregateTargetList, AGGSPLIT_SIMPLE,
&aggregateCosts);
get_agg_clause_costs(NULL, (Node *) havingQual, AGGSPLIT_SIMPLE, &aggregateCosts);
#else
count_agg_clauses(NULL, (Node *) aggregateTargetList, &aggregateCosts);
#endif
@ -188,7 +189,8 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
groupColumnOpArray, NIL, NIL,
rowEstimate, subPlan);
#else
aggregatePlan = make_agg(NULL, aggregateTargetList, (List *) havingQual, aggregateStrategy,
aggregatePlan = make_agg(NULL, aggregateTargetList, (List *) havingQual,
aggregateStrategy,
&aggregateCosts, groupColumnCount, groupColumnIdArray,
groupColumnOpArray, NIL, rowEstimate, subPlan);
#endif

View File

@ -3,6 +3,11 @@
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 520000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 520000;
CREATE OR REPLACE FUNCTION array_sort (ANYARRAY)
RETURNS ANYARRAY LANGUAGE SQL
AS $$
SELECT ARRAY(SELECT unnest($1) ORDER BY 1)
$$;
-- Check multi_cat_agg() aggregate which is used to implement array_agg()
SELECT array_cat_agg(i) FROM (VALUES (ARRAY[1,2]), (NULL), (ARRAY[3,4])) AS t(i);
array_cat_agg
@ -18,68 +23,68 @@ ERROR: array_agg with order by is unsupported
SELECT array_agg(distinct l_orderkey ORDER BY l_orderkey) FROM lineitem;
ERROR: array_agg with order by is unsupported
-- Check array_agg() for different data types and LIMIT clauses
SELECT array_agg(l_partkey) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_partkey)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
--------------------------------------------------
{155190,67310,63700,2132,24027,15635}
{2132,15635,24027,63700,67310,155190}
{106170}
{4297,19036,128449,29380,183095,62143}
{4297,19036,29380,62143,128449,183095}
{88035}
{108570,123927,37531}
{37531,108570,123927}
{139636}
{182052,145243,94780,163073,151894,79251,157238}
{82704,197921,44161,2743,85811,11615}
{61336,60519,137469,33918}
{79251,94780,145243,151894,157238,163073,182052}
{2743,11615,44161,82704,85811,197921}
{33918,60519,61336,137469}
{88362,89414,169544}
(10 rows)
SELECT array_agg(l_extendedprice) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_extendedprice)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
-----------------------------------------------------------------
{21168.23,45983.16,13309.60,28955.64,22824.48,49620.16}
{13309.60,21168.23,22824.48,28955.64,45983.16,49620.16}
{44694.46}
{54058.05,46796.47,39890.88,2618.76,32986.52,28733.64}
{2618.76,28733.64,32986.52,39890.88,46796.47,54058.05}
{30690.90}
{23678.55,50723.92,73426.50}
{61998.31}
{13608.60,11594.16,81639.88,31809.96,73943.82,43058.75,6476.15}
{47227.60,64605.44,2210.32,6582.96,79059.64,9159.66}
{40217.23,47344.32,7532.30,75928.31}
{17554.68,30875.02,9681.24}
{6476.15,11594.16,13608.60,31809.96,43058.75,73943.82,81639.88}
{2210.32,6582.96,9159.66,47227.60,64605.44,79059.64}
{7532.30,40217.23,47344.32,75928.31}
{9681.24,17554.68,30875.02}
(10 rows)
SELECT array_agg(l_shipdate) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipdate)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
--------------------------------------------------------------------------------
{03-13-1996,04-12-1996,01-29-1996,04-21-1996,03-30-1996,01-30-1996}
{01-29-1996,01-30-1996,03-13-1996,03-30-1996,04-12-1996,04-21-1996}
{01-28-1997}
{02-02-1994,11-09-1993,01-16-1994,12-04-1993,12-14-1993,10-29-1993}
{10-29-1993,11-09-1993,12-04-1993,12-14-1993,01-16-1994,02-02-1994}
{01-10-1996}
{10-31-1994,10-16-1994,08-08-1994}
{08-08-1994,10-16-1994,10-31-1994}
{04-27-1992}
{05-07-1996,02-01-1996,01-15-1996,03-21-1996,02-11-1996,01-16-1996,02-10-1996}
{10-23-1995,08-14-1995,08-07-1995,08-04-1995,08-28-1995,07-21-1995}
{10-29-1993,12-09-1993,12-09-1993,11-09-1993}
{10-23-1998,10-09-1998,10-30-1998}
{01-15-1996,01-16-1996,02-01-1996,02-10-1996,02-11-1996,03-21-1996,05-07-1996}
{07-21-1995,08-04-1995,08-07-1995,08-14-1995,08-28-1995,10-23-1995}
{10-29-1993,11-09-1993,12-09-1993,12-09-1993}
{10-09-1998,10-23-1998,10-30-1998}
(10 rows)
SELECT array_agg(l_shipmode) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipmode)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
array_agg
array_sort
----------------------------------------------------------------------------------------------
{"TRUCK ","MAIL ","REG AIR ","AIR ","FOB ","MAIL "}
{"AIR ","FOB ","MAIL ","MAIL ","REG AIR ","TRUCK "}
{"RAIL "}
{"AIR ","RAIL ","SHIP ","TRUCK ","FOB ","RAIL "}
{"AIR ","FOB ","RAIL ","RAIL ","SHIP ","TRUCK "}
{"REG AIR "}
{"AIR ","FOB ","AIR "}
{"AIR ","AIR ","FOB "}
{"TRUCK "}
{"FOB ","SHIP ","MAIL ","FOB ","TRUCK ","FOB ","FOB "}
{"TRUCK ","AIR ","AIR ","REG AIR ","AIR ","RAIL "}
{"TRUCK ","MAIL ","AIR ","MAIL "}
{"REG AIR ","FOB ","FOB "}
{"FOB ","FOB ","FOB ","FOB ","MAIL ","SHIP ","TRUCK "}
{"AIR ","AIR ","AIR ","RAIL ","REG AIR ","TRUCK "}
{"AIR ","MAIL ","MAIL ","TRUCK "}
{"FOB ","FOB ","REG AIR "}
(10 rows)
-- Check that we can execute array_agg() within other functions
@ -93,10 +98,10 @@ SELECT array_length(array_agg(l_orderkey), 1) FROM lineitem;
-- shards and contain different aggregates, filter clauses and other complex
-- expressions. Note that the l_orderkey ranges are such that the matching rows
-- lie in different shards.
SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM lineitem
SELECT l_quantity, count(*), avg(l_extendedprice), array_sort(array_agg(l_orderkey)) FROM lineitem
WHERE l_quantity < 5 AND l_orderkey > 5500 AND l_orderkey < 9500
GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | count | avg | array_agg
l_quantity | count | avg | array_sort
------------+-------+-----------------------+--------------------------------------------------------------------------------------------------
1.00 | 17 | 1477.1258823529411765 | {5543,5633,5634,5698,5766,5856,5857,5986,8997,9026,9158,9184,9220,9222,9348,9383,9476}
2.00 | 19 | 3078.4242105263157895 | {5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923,9030,9058,9123,9124,9188,9344,9441,9476}
@ -104,21 +109,21 @@ SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM li
4.00 | 19 | 5929.7136842105263158 | {5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985,9091,9120,9281,9347,9382,9440,9473}
(4 rows)
SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month
SELECT l_quantity, array_sort(array_agg(extract (month FROM o_orderdate))) AS my_month
FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_quantity < 5
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | my_month
------------+------------------------------------------------
1.00 | {9,5,7,5,9,11,11,4,7,7,4,7,4,2,6,3,5}
2.00 | {11,10,8,5,5,12,3,11,7,11,5,7,6,6,10,1,12,6,5}
3.00 | {4,9,8,11,7,10,6,7,8,5,8,9,11,3}
4.00 | {1,5,6,11,12,10,9,6,1,2,5,1,11,6,2,8,2,6,10}
1.00 | {2,3,4,4,4,5,5,5,6,7,7,7,7,9,9,11,11}
2.00 | {1,3,5,5,5,5,6,6,6,7,7,8,10,10,11,11,11,12,12}
3.00 | {3,4,5,6,7,7,8,8,8,9,9,10,11,11}
4.00 | {1,1,1,2,2,2,5,5,6,6,6,6,8,9,10,10,11,11,12}
(4 rows)
SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5
SELECT l_quantity, array_sort(array_agg(l_orderkey * 2 + 1)) FROM lineitem WHERE l_quantity < 5
AND octet_length(l_comment) + octet_length('randomtext'::text) > 40
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | array_agg
l_quantity | array_sort
------------+---------------------------------------------
1.00 | {11269,11397,11713,11715,11973,18317,18445}
2.00 | {11847,18061,18247,18953}

View File

@ -6,6 +6,11 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 520000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 520000;
CREATE OR REPLACE FUNCTION array_sort (ANYARRAY)
RETURNS ANYARRAY LANGUAGE SQL
AS $$
SELECT ARRAY(SELECT unnest($1) ORDER BY 1)
$$;
-- Check multi_cat_agg() aggregate which is used to implement array_agg()
@ -21,16 +26,16 @@ SELECT array_agg(distinct l_orderkey ORDER BY l_orderkey) FROM lineitem;
-- Check array_agg() for different data types and LIMIT clauses
SELECT array_agg(l_partkey) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_partkey)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
SELECT array_agg(l_extendedprice) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_extendedprice)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
SELECT array_agg(l_shipdate) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipdate)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
SELECT array_agg(l_shipmode) FROM lineitem GROUP BY l_orderkey
SELECT array_sort(array_agg(l_shipmode)) FROM lineitem GROUP BY l_orderkey
ORDER BY l_orderkey LIMIT 10;
-- Check that we can execute array_agg() within other functions
@ -42,15 +47,15 @@ SELECT array_length(array_agg(l_orderkey), 1) FROM lineitem;
-- expressions. Note that the l_orderkey ranges are such that the matching rows
-- lie in different shards.
SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM lineitem
SELECT l_quantity, count(*), avg(l_extendedprice), array_sort(array_agg(l_orderkey)) FROM lineitem
WHERE l_quantity < 5 AND l_orderkey > 5500 AND l_orderkey < 9500
GROUP BY l_quantity ORDER BY l_quantity;
SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month
SELECT l_quantity, array_sort(array_agg(extract (month FROM o_orderdate))) AS my_month
FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_quantity < 5
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5
SELECT l_quantity, array_sort(array_agg(l_orderkey * 2 + 1)) FROM lineitem WHERE l_quantity < 5
AND octet_length(l_comment) + octet_length('randomtext'::text) > 40
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;