From 8a626c07315ad1476faa4c48b542e567a0f09cc6 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Wed, 5 Oct 2016 09:58:15 -0600 Subject: [PATCH] Fix aggregate issues The aggtranstype field is somewhat tricky to set, but I discovered that get_agg_clause_costs would set it correctly for me. Also wraps all array_agg calls in a array_sort function at test time in order to avoid random ordering issues. --- .../planner/multi_logical_optimizer.c | 69 ++++++++++++-- .../planner/multi_master_planner.c | 4 +- src/test/regress/expected/multi_array_agg.out | 91 ++++++++++--------- src/test/regress/sql/multi_array_agg.sql | 19 ++-- 4 files changed, 126 insertions(+), 57 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 882b3b1ff..c51ac9718 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -39,6 +39,7 @@ #include "optimizer/clauses.h" #include "optimizer/tlist.h" #include "optimizer/var.h" +#include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "parser/parse_oper.h" #include "utils/builtins.h" @@ -1271,8 +1272,17 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode) bool hasAggregates = contain_agg_clause((Node *) originalExpression); if (hasAggregates) { + AggClauseCosts aggregateCosts; Node *newNode = MasterAggregateMutator((Node *) originalExpression, walkerContext); + +#if PG_VERSION_NUM >= 90600 + get_agg_clause_costs(NULL, (Node *) newNode, AGGSPLIT_SIMPLE, + &aggregateCosts); +#else + count_agg_clauses(NULL, (Node *) newNode, &aggregateCosts); +#endif + newExpression = (Expr *) newNode; } else @@ -1297,7 +1307,17 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode) if (originalHavingQual != NULL) { + AggClauseCosts aggregateCosts; + newHavingQual = MasterAggregateMutator(originalHavingQual, walkerContext); + + memset(&aggregateCosts, 0, sizeof(aggregateCosts)); +#if PG_VERSION_NUM >= 90600 + get_agg_clause_costs(NULL, (Node *) newHavingQual, AGGSPLIT_SIMPLE, + &aggregateCosts); +#else + count_agg_clauses(NULL, (Node *) newHavingQual, &aggregateCosts); +#endif } masterExtendedOpNode = CitusMakeNode(MultiExtendedOp); @@ -1468,6 +1488,8 @@ MasterAggregateExpression(Aggref *originalAggregate, unionAggregate->args = list_make1(hllTargetEntry); unionAggregate->aggkind = AGGKIND_NORMAL; + /* TODO: Fix this for 9.6 */ + cardinalityExpression = makeNode(FuncExpr); cardinalityExpression->funcid = cardinalityFunctionId; cardinalityExpression->funcresulttype = cardinalityReturnType; @@ -1527,8 +1549,8 @@ MasterAggregateExpression(Aggref *originalAggregate, newMasterAggregate->aggfnoid = sumFunctionId; newMasterAggregate->aggtype = masterReturnType; #if (PG_VERSION_NUM >= 90600) - newMasterAggregate->aggtranstype = INTERNALOID; - newMasterAggregate->aggargtypes = list_make1_oid(NUMERICOID); + newMasterAggregate->aggtranstype = InvalidOid; + newMasterAggregate->aggargtypes = list_make1_oid(newMasterAggregate->aggtype); newMasterAggregate->aggsplit = AGGSPLIT_SIMPLE; #endif @@ -1595,6 +1617,11 @@ MasterAggregateExpression(Aggref *originalAggregate, newMasterAggregate = copyObject(originalAggregate); newMasterAggregate->aggfnoid = aggregateFunctionId; newMasterAggregate->args = list_make1(arrayCatAggArgument); +#if (PG_VERSION_NUM >= 90600) + newMasterAggregate->aggtranstype = InvalidOid; + newMasterAggregate->aggargtypes = list_make1_oid(ANYARRAYOID); + newMasterAggregate->aggsplit = AGGSPLIT_SIMPLE; +#endif newMasterExpression = (Expr *) newMasterAggregate; } @@ -1688,8 +1715,8 @@ MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType, firstSum->args = list_make1(firstTargetEntry); firstSum->aggkind = AGGKIND_NORMAL; #if (PG_VERSION_NUM >= 90600) - firstSum->aggtranstype = INTERNALOID; - firstSum->aggargtypes = list_make1_oid(NUMERICOID); + firstSum->aggtranstype = InvalidOid; + firstSum->aggargtypes = list_make1_oid(firstSum->aggtype); firstSum->aggsplit = AGGSPLIT_SIMPLE; #endif @@ -1705,8 +1732,8 @@ MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType, secondSum->args = list_make1(secondTargetEntry); secondSum->aggkind = AGGKIND_NORMAL; #if (PG_VERSION_NUM >= 90600) - secondSum->aggtranstype = INTERNALOID; - secondSum->aggargtypes = list_make1_oid(NUMERICOID); + secondSum->aggtranstype = InvalidOid; + secondSum->aggargtypes = list_make1_oid(firstSum->aggtype); secondSum->aggsplit = AGGSPLIT_SIMPLE; #endif @@ -1812,7 +1839,16 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode) if (hasAggregates) { + AggClauseCosts aggregateCosts; WorkerAggregateWalker((Node *) originalExpression, walkerContext); + +#if PG_VERSION_NUM >= 90600 + get_agg_clause_costs(NULL, (Node *) walkerContext->expressionList, + AGGSPLIT_SIMPLE, &aggregateCosts); +#else + count_agg_clauses(NULL, (Node *) walkerContext->expressionList, + &aggregateCosts); +#endif newExpressionList = walkerContext->expressionList; } else @@ -1880,6 +1916,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode) { List *newExpressionList = NIL; ListCell *newExpressionCell = NULL; + AggClauseCosts aggregateCosts; /* reset walker context */ walkerContext->expressionList = NIL; @@ -1887,6 +1924,15 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode) WorkerAggregateWalker(havingQual, walkerContext); newExpressionList = walkerContext->expressionList; + memset(&aggregateCosts, 0, sizeof(aggregateCosts)); + +#if PG_VERSION_NUM >= 90600 + get_agg_clause_costs(NULL, (Node *) walkerContext->expressionList, + AGGSPLIT_SIMPLE, &aggregateCosts); +#else + count_agg_clauses(NULL, (Node *) walkerContext->expressionList, + &aggregateCosts); +#endif /* now create target entries for each new expression */ foreach(newExpressionCell, newExpressionList) @@ -2075,9 +2121,20 @@ WorkerAggregateExpressionList(Aggref *originalAggregate, sumAggregate->aggfnoid = AggregateFunctionOid(sumAggregateName, argumentType); sumAggregate->aggtype = get_func_rettype(sumAggregate->aggfnoid); +#if (PG_VERSION_NUM >= 90600) + sumAggregate->aggtranstype = InvalidOid; + sumAggregate->aggargtypes = list_make1_oid(argumentType); + sumAggregate->aggsplit = AGGSPLIT_SIMPLE; +#endif + /* count has any input type */ countAggregate->aggfnoid = AggregateFunctionOid(countAggregateName, ANYOID); countAggregate->aggtype = get_func_rettype(countAggregate->aggfnoid); +#if (PG_VERSION_NUM >= 90600) + countAggregate->aggtranstype = InvalidOid; + countAggregate->aggargtypes = list_make1_oid(argumentType); + countAggregate->aggsplit = AGGSPLIT_SIMPLE; +#endif workerAggregateList = lappend(workerAggregateList, sumAggregate); workerAggregateList = lappend(workerAggregateList, countAggregate); diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index a1541d3a9..f29756045 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -144,6 +144,7 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan) #if (PG_VERSION_NUM >= 90600) get_agg_clause_costs(NULL, (Node *) aggregateTargetList, AGGSPLIT_SIMPLE, &aggregateCosts); + get_agg_clause_costs(NULL, (Node *) havingQual, AGGSPLIT_SIMPLE, &aggregateCosts); #else count_agg_clauses(NULL, (Node *) aggregateTargetList, &aggregateCosts); #endif @@ -188,7 +189,8 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan) groupColumnOpArray, NIL, NIL, rowEstimate, subPlan); #else - aggregatePlan = make_agg(NULL, aggregateTargetList, (List *) havingQual, aggregateStrategy, + aggregatePlan = make_agg(NULL, aggregateTargetList, (List *) havingQual, + aggregateStrategy, &aggregateCosts, groupColumnCount, groupColumnIdArray, groupColumnOpArray, NIL, rowEstimate, subPlan); #endif diff --git a/src/test/regress/expected/multi_array_agg.out b/src/test/regress/expected/multi_array_agg.out index b473b7967..7ce3556eb 100644 --- a/src/test/regress/expected/multi_array_agg.out +++ b/src/test/regress/expected/multi_array_agg.out @@ -3,6 +3,11 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 520000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 520000; +CREATE OR REPLACE FUNCTION array_sort (ANYARRAY) +RETURNS ANYARRAY LANGUAGE SQL +AS $$ +SELECT ARRAY(SELECT unnest($1) ORDER BY 1) +$$; -- Check multi_cat_agg() aggregate which is used to implement array_agg() SELECT array_cat_agg(i) FROM (VALUES (ARRAY[1,2]), (NULL), (ARRAY[3,4])) AS t(i); array_cat_agg @@ -18,68 +23,68 @@ ERROR: array_agg with order by is unsupported SELECT array_agg(distinct l_orderkey ORDER BY l_orderkey) FROM lineitem; ERROR: array_agg with order by is unsupported -- Check array_agg() for different data types and LIMIT clauses -SELECT array_agg(l_partkey) FROM lineitem GROUP BY l_orderkey +SELECT array_sort(array_agg(l_partkey)) FROM lineitem GROUP BY l_orderkey ORDER BY l_orderkey LIMIT 10; - array_agg + array_sort -------------------------------------------------- - {155190,67310,63700,2132,24027,15635} + {2132,15635,24027,63700,67310,155190} {106170} - {4297,19036,128449,29380,183095,62143} + {4297,19036,29380,62143,128449,183095} {88035} - {108570,123927,37531} + {37531,108570,123927} {139636} - {182052,145243,94780,163073,151894,79251,157238} - {82704,197921,44161,2743,85811,11615} - {61336,60519,137469,33918} + {79251,94780,145243,151894,157238,163073,182052} + {2743,11615,44161,82704,85811,197921} + {33918,60519,61336,137469} {88362,89414,169544} (10 rows) -SELECT array_agg(l_extendedprice) FROM lineitem GROUP BY l_orderkey +SELECT array_sort(array_agg(l_extendedprice)) FROM lineitem GROUP BY l_orderkey ORDER BY l_orderkey LIMIT 10; - array_agg + array_sort ----------------------------------------------------------------- - {21168.23,45983.16,13309.60,28955.64,22824.48,49620.16} + {13309.60,21168.23,22824.48,28955.64,45983.16,49620.16} {44694.46} - {54058.05,46796.47,39890.88,2618.76,32986.52,28733.64} + {2618.76,28733.64,32986.52,39890.88,46796.47,54058.05} {30690.90} {23678.55,50723.92,73426.50} {61998.31} - {13608.60,11594.16,81639.88,31809.96,73943.82,43058.75,6476.15} - {47227.60,64605.44,2210.32,6582.96,79059.64,9159.66} - {40217.23,47344.32,7532.30,75928.31} - {17554.68,30875.02,9681.24} + {6476.15,11594.16,13608.60,31809.96,43058.75,73943.82,81639.88} + {2210.32,6582.96,9159.66,47227.60,64605.44,79059.64} + {7532.30,40217.23,47344.32,75928.31} + {9681.24,17554.68,30875.02} (10 rows) -SELECT array_agg(l_shipdate) FROM lineitem GROUP BY l_orderkey +SELECT array_sort(array_agg(l_shipdate)) FROM lineitem GROUP BY l_orderkey ORDER BY l_orderkey LIMIT 10; - array_agg + array_sort -------------------------------------------------------------------------------- - {03-13-1996,04-12-1996,01-29-1996,04-21-1996,03-30-1996,01-30-1996} + {01-29-1996,01-30-1996,03-13-1996,03-30-1996,04-12-1996,04-21-1996} {01-28-1997} - {02-02-1994,11-09-1993,01-16-1994,12-04-1993,12-14-1993,10-29-1993} + {10-29-1993,11-09-1993,12-04-1993,12-14-1993,01-16-1994,02-02-1994} {01-10-1996} - {10-31-1994,10-16-1994,08-08-1994} + {08-08-1994,10-16-1994,10-31-1994} {04-27-1992} - {05-07-1996,02-01-1996,01-15-1996,03-21-1996,02-11-1996,01-16-1996,02-10-1996} - {10-23-1995,08-14-1995,08-07-1995,08-04-1995,08-28-1995,07-21-1995} - {10-29-1993,12-09-1993,12-09-1993,11-09-1993} - {10-23-1998,10-09-1998,10-30-1998} + {01-15-1996,01-16-1996,02-01-1996,02-10-1996,02-11-1996,03-21-1996,05-07-1996} + {07-21-1995,08-04-1995,08-07-1995,08-14-1995,08-28-1995,10-23-1995} + {10-29-1993,11-09-1993,12-09-1993,12-09-1993} + {10-09-1998,10-23-1998,10-30-1998} (10 rows) -SELECT array_agg(l_shipmode) FROM lineitem GROUP BY l_orderkey +SELECT array_sort(array_agg(l_shipmode)) FROM lineitem GROUP BY l_orderkey ORDER BY l_orderkey LIMIT 10; - array_agg + array_sort ---------------------------------------------------------------------------------------------- - {"TRUCK ","MAIL ","REG AIR ","AIR ","FOB ","MAIL "} + {"AIR ","FOB ","MAIL ","MAIL ","REG AIR ","TRUCK "} {"RAIL "} - {"AIR ","RAIL ","SHIP ","TRUCK ","FOB ","RAIL "} + {"AIR ","FOB ","RAIL ","RAIL ","SHIP ","TRUCK "} {"REG AIR "} - {"AIR ","FOB ","AIR "} + {"AIR ","AIR ","FOB "} {"TRUCK "} - {"FOB ","SHIP ","MAIL ","FOB ","TRUCK ","FOB ","FOB "} - {"TRUCK ","AIR ","AIR ","REG AIR ","AIR ","RAIL "} - {"TRUCK ","MAIL ","AIR ","MAIL "} - {"REG AIR ","FOB ","FOB "} + {"FOB ","FOB ","FOB ","FOB ","MAIL ","SHIP ","TRUCK "} + {"AIR ","AIR ","AIR ","RAIL ","REG AIR ","TRUCK "} + {"AIR ","MAIL ","MAIL ","TRUCK "} + {"FOB ","FOB ","REG AIR "} (10 rows) -- Check that we can execute array_agg() within other functions @@ -93,10 +98,10 @@ SELECT array_length(array_agg(l_orderkey), 1) FROM lineitem; -- shards and contain different aggregates, filter clauses and other complex -- expressions. Note that the l_orderkey ranges are such that the matching rows -- lie in different shards. -SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM lineitem +SELECT l_quantity, count(*), avg(l_extendedprice), array_sort(array_agg(l_orderkey)) FROM lineitem WHERE l_quantity < 5 AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity; - l_quantity | count | avg | array_agg + l_quantity | count | avg | array_sort ------------+-------+-----------------------+-------------------------------------------------------------------------------------------------- 1.00 | 17 | 1477.1258823529411765 | {5543,5633,5634,5698,5766,5856,5857,5986,8997,9026,9158,9184,9220,9222,9348,9383,9476} 2.00 | 19 | 3078.4242105263157895 | {5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923,9030,9058,9123,9124,9188,9344,9441,9476} @@ -104,21 +109,21 @@ SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM li 4.00 | 19 | 5929.7136842105263158 | {5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985,9091,9120,9281,9347,9382,9440,9473} (4 rows) -SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month +SELECT l_quantity, array_sort(array_agg(extract (month FROM o_orderdate))) AS my_month FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_quantity < 5 AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity; l_quantity | my_month ------------+------------------------------------------------ - 1.00 | {9,5,7,5,9,11,11,4,7,7,4,7,4,2,6,3,5} - 2.00 | {11,10,8,5,5,12,3,11,7,11,5,7,6,6,10,1,12,6,5} - 3.00 | {4,9,8,11,7,10,6,7,8,5,8,9,11,3} - 4.00 | {1,5,6,11,12,10,9,6,1,2,5,1,11,6,2,8,2,6,10} + 1.00 | {2,3,4,4,4,5,5,5,6,7,7,7,7,9,9,11,11} + 2.00 | {1,3,5,5,5,5,6,6,6,7,7,8,10,10,11,11,11,12,12} + 3.00 | {3,4,5,6,7,7,8,8,8,9,9,10,11,11} + 4.00 | {1,1,1,2,2,2,5,5,6,6,6,6,8,9,10,10,11,11,12} (4 rows) -SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5 +SELECT l_quantity, array_sort(array_agg(l_orderkey * 2 + 1)) FROM lineitem WHERE l_quantity < 5 AND octet_length(l_comment) + octet_length('randomtext'::text) > 40 AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity; - l_quantity | array_agg + l_quantity | array_sort ------------+--------------------------------------------- 1.00 | {11269,11397,11713,11715,11973,18317,18445} 2.00 | {11847,18061,18247,18953} diff --git a/src/test/regress/sql/multi_array_agg.sql b/src/test/regress/sql/multi_array_agg.sql index 541d5ebd9..8688c2c2e 100644 --- a/src/test/regress/sql/multi_array_agg.sql +++ b/src/test/regress/sql/multi_array_agg.sql @@ -6,6 +6,11 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 520000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 520000; +CREATE OR REPLACE FUNCTION array_sort (ANYARRAY) +RETURNS ANYARRAY LANGUAGE SQL +AS $$ +SELECT ARRAY(SELECT unnest($1) ORDER BY 1) +$$; -- Check multi_cat_agg() aggregate which is used to implement array_agg() @@ -21,16 +26,16 @@ SELECT array_agg(distinct l_orderkey ORDER BY l_orderkey) FROM lineitem; -- Check array_agg() for different data types and LIMIT clauses -SELECT array_agg(l_partkey) FROM lineitem GROUP BY l_orderkey +SELECT array_sort(array_agg(l_partkey)) FROM lineitem GROUP BY l_orderkey ORDER BY l_orderkey LIMIT 10; -SELECT array_agg(l_extendedprice) FROM lineitem GROUP BY l_orderkey +SELECT array_sort(array_agg(l_extendedprice)) FROM lineitem GROUP BY l_orderkey ORDER BY l_orderkey LIMIT 10; -SELECT array_agg(l_shipdate) FROM lineitem GROUP BY l_orderkey +SELECT array_sort(array_agg(l_shipdate)) FROM lineitem GROUP BY l_orderkey ORDER BY l_orderkey LIMIT 10; -SELECT array_agg(l_shipmode) FROM lineitem GROUP BY l_orderkey +SELECT array_sort(array_agg(l_shipmode)) FROM lineitem GROUP BY l_orderkey ORDER BY l_orderkey LIMIT 10; -- Check that we can execute array_agg() within other functions @@ -42,15 +47,15 @@ SELECT array_length(array_agg(l_orderkey), 1) FROM lineitem; -- expressions. Note that the l_orderkey ranges are such that the matching rows -- lie in different shards. -SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM lineitem +SELECT l_quantity, count(*), avg(l_extendedprice), array_sort(array_agg(l_orderkey)) FROM lineitem WHERE l_quantity < 5 AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity; -SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month +SELECT l_quantity, array_sort(array_agg(extract (month FROM o_orderdate))) AS my_month FROM lineitem, orders WHERE l_orderkey = o_orderkey AND l_quantity < 5 AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity; -SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5 +SELECT l_quantity, array_sort(array_agg(l_orderkey * 2 + 1)) FROM lineitem WHERE l_quantity < 5 AND octet_length(l_comment) + octet_length('randomtext'::text) > 40 AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;