Fix coordinator/worker query targetlists for agg. that we cannot push-down (#5679)

Previously, we were wrapping targetlist nodes with Vars that reference
to the result of the worker query, if the node itself is not `Const` or
not a `Param`. Indeed, we should not do that unless the node itself is
a `Var` node or contains a `Var` within it (e.g.: `OpExpr(Var(column_a) > 2)`).
Otherwise, when worker query returns empty result set, then combine
query exec would crash since the `Var` would be pointing to an empty
tuple slot, which is not desirable for the node-executor methods.
pull/5676/head
Onur Tirtir 2022-02-04 16:37:25 +03:00 committed by GitHub
parent b9b4833710
commit 79442df1b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 179 additions and 2 deletions

View File

@ -1616,7 +1616,19 @@ MasterAggregateExpression(Aggref *originalAggregate,
Expr *directarg;
foreach_ptr(directarg, originalAggregate->aggdirectargs)
{
if (!IsA(directarg, Const) && !IsA(directarg, Param))
/*
* Need to replace nodes that contain any Vars with Vars referring
* to the related column of the result set returned for the worker
* aggregation.
*
* When there are no Vars, then the expression can be fully evaluated
* on the coordinator, so we skip it here. This is not just an
* optimization, but the result of the expression might require
* calling the final function of the aggregate, and doing so when
* there are no input rows (i.e.: with an empty tuple slot) is not
* desirable for the node-executor methods.
*/
if (pull_var_clause_default((Node *) directarg) != NIL)
{
Var *var = makeVar(masterTableId, walkerContext->columnId,
exprType((Node *) directarg),
@ -3070,7 +3082,13 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
Expr *directarg;
foreach_ptr(directarg, originalAggregate->aggdirectargs)
{
if (!IsA(directarg, Const) && !IsA(directarg, Param))
/*
* The worker aggregation should execute any node that contains any
* Var nodes and return the result in the targetlist, so that the
* combine query can then fetch the result via remote scan; see
* MasterAggregateExpression.
*/
if (pull_var_clause_default((Node *) directarg) != NIL)
{
workerAggregateList = lappend(workerAggregateList, directarg);
}

View File

@ -712,6 +712,19 @@ select array_agg(val order by valf) from aggdata;
{0,NULL,2,3,5,2,4,NULL,NULL,8,NULL}
(1 row)
-- test by using some other node types as arguments to agg
select key, percentile_cont((key - (key > 4)::int) / 10.0) within group(order by val) from aggdata group by key;
key | percentile_cont
---------------------------------------------------------------------
1 | 2
2 | 2.4
3 | 4
5 |
6 |
7 | 8
9 | 0
(7 rows)
-- Test TransformSubqueryNode
select * FROM (
SELECT key, mode() within group (order by floor(agg1.val/2)) m from aggdata agg1
@ -932,5 +945,100 @@ SELECT square_func(5), a, count(a) FROM t1 GROUP BY a;
ERROR: function aggregate_support.square_func(integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
CONTEXT: while executing command on localhost:xxxxx
-- Test the cases where the worker agg exec. returns no tuples.
CREATE TABLE dist_table (dist_col int, agg_col numeric);
SELECT create_distributed_table('dist_table', 'dist_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE ref_table (int_col int);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
percentile_disc
---------------------------------------------------------------------
(1 row)
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM (SELECT *, random() FROM dist_table) a;
percentile_disc
---------------------------------------------------------------------
(1 row)
SELECT PERCENTILE_DISC((2 > random())::int::numeric / 10) WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
percentile_disc
---------------------------------------------------------------------
(1 row)
SELECT SUM(COALESCE(agg_col, 3))
FROM dist_table
LEFT JOIN ref_table ON TRUE;
sum
---------------------------------------------------------------------
(1 row)
SELECT AVG(COALESCE(agg_col, 10))
FROM dist_table
LEFT JOIN ref_table ON TRUE;
avg
---------------------------------------------------------------------
(1 row)
insert into dist_table values (2, 11.2), (3, NULL), (6, 3.22), (3, 4.23), (5, 5.25), (4, 63.4), (75, NULL), (80, NULL), (96, NULL), (8, 1078), (0, 1.19);
-- run the same queries after loading some data
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
percentile_disc
---------------------------------------------------------------------
3.22
(1 row)
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM (SELECT *, random() FROM dist_table) a;
percentile_disc
---------------------------------------------------------------------
3.22
(1 row)
SELECT PERCENTILE_DISC((2 > random())::int::numeric / 10) WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
percentile_disc
---------------------------------------------------------------------
1.19
(1 row)
SELECT floor(SUM(COALESCE(agg_col, 3)))
FROM dist_table
LEFT JOIN ref_table ON TRUE;
floor
---------------------------------------------------------------------
1178
(1 row)
SELECT floor(AVG(COALESCE(agg_col, 10)))
FROM dist_table
LEFT JOIN ref_table ON TRUE;
floor
---------------------------------------------------------------------
109
(1 row)
set client_min_messages to error;
drop schema aggregate_support cascade;

View File

@ -376,6 +376,9 @@ select percentile_cont(0.5) within group(order by valf) from aggdata;
select key, percentile_cont(key/10.0) within group(order by val) from aggdata group by key;
select array_agg(val order by valf) from aggdata;
-- test by using some other node types as arguments to agg
select key, percentile_cont((key - (key > 4)::int) / 10.0) within group(order by val) from aggdata group by key;
-- Test TransformSubqueryNode
select * FROM (
@ -479,6 +482,54 @@ SELECT square_func(5), a FROM t1 GROUP BY a;
-- the expression will be pushed down.
SELECT square_func(5), a, count(a) FROM t1 GROUP BY a;
-- Test the cases where the worker agg exec. returns no tuples.
CREATE TABLE dist_table (dist_col int, agg_col numeric);
SELECT create_distributed_table('dist_table', 'dist_col');
CREATE TABLE ref_table (int_col int);
SELECT create_reference_table('ref_table');
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM (SELECT *, random() FROM dist_table) a;
SELECT PERCENTILE_DISC((2 > random())::int::numeric / 10) WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
SELECT SUM(COALESCE(agg_col, 3))
FROM dist_table
LEFT JOIN ref_table ON TRUE;
SELECT AVG(COALESCE(agg_col, 10))
FROM dist_table
LEFT JOIN ref_table ON TRUE;
insert into dist_table values (2, 11.2), (3, NULL), (6, 3.22), (3, 4.23), (5, 5.25), (4, 63.4), (75, NULL), (80, NULL), (96, NULL), (8, 1078), (0, 1.19);
-- run the same queries after loading some data
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col)
FROM (SELECT *, random() FROM dist_table) a;
SELECT PERCENTILE_DISC((2 > random())::int::numeric / 10) WITHIN GROUP (ORDER BY agg_col)
FROM dist_table
LEFT JOIN ref_table ON TRUE;
SELECT floor(SUM(COALESCE(agg_col, 3)))
FROM dist_table
LEFT JOIN ref_table ON TRUE;
SELECT floor(AVG(COALESCE(agg_col, 10)))
FROM dist_table
LEFT JOIN ref_table ON TRUE;
set client_min_messages to error;
drop schema aggregate_support cascade;