diff --git a/src/backend/distributed/planner/extended_op_node_utils.c b/src/backend/distributed/planner/extended_op_node_utils.c index 1a4c55554..393175312 100644 --- a/src/backend/distributed/planner/extended_op_node_utils.c +++ b/src/backend/distributed/planner/extended_op_node_utils.c @@ -79,6 +79,9 @@ BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode, hasNonPartitionColumnDistinctAgg, extendedOpNode->onlyPushableWindowFunctions); + extendedOpNodeProperties.hasGroupBy = extendedOpNode->groupClauseList != NIL; + extendedOpNodeProperties.hasAggregate = TargetListHasAggregates(targetList); + extendedOpNodeProperties.groupedByDisjointPartitionColumn = groupedByDisjointPartitionColumn; extendedOpNodeProperties.repartitionSubquery = repartitionSubquery; diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 2d7a9875e..8e158e27c 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -228,7 +228,6 @@ static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression); static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, ExtendedOpNodeProperties * extendedOpNodeProperties); -static bool TargetListHasAggregates(List *targetEntryList); static void ProcessTargetListForWorkerQuery(List *targetEntryList, ExtendedOpNodeProperties * extendedOpNodeProperties, @@ -2830,7 +2829,7 @@ BuildOrderByLimitReference(bool hasDistinctOn, bool groupedByDisjointPartitionCo * target list contain aggregates that are not inside the window functions. * This function should not be called if window functions are being pulled up. */ -static bool +bool TargetListHasAggregates(List *targetEntryList) { TargetEntry *targetEntry = NULL; @@ -3840,7 +3839,24 @@ CanPushDownExpression(Node *expression, bool hasWindowFunction = contain_window_function(expression); if (!hasAggregate && !hasWindowFunction) { - return true; + /* + * If the query has the form SELECT expression, agg(..) FROM table; + * then expression should be evaluated on the coordinator. + * + * Other than the efficiency part of this, we could also crash if + * we pushed down the expression to the workers. When pushing down + * expressions to workers we create a Var reference to the worker + * tuples. If the result from worker is empty, but we need to have + * at least a row in coordinator result, postgres will crash when + * trying to evaluate the Var. + * + * For details, see https://github.com/citusdata/citus/pull/3961 + */ + if (!extendedOpNodeProperties->hasAggregate || + extendedOpNodeProperties->hasGroupBy) + { + return true; + } } /* aggregates inside pushed down window functions can be pushed down */ diff --git a/src/include/distributed/extended_op_node_utils.h b/src/include/distributed/extended_op_node_utils.h index 5d2c7cead..e36c35a72 100644 --- a/src/include/distributed/extended_op_node_utils.h +++ b/src/include/distributed/extended_op_node_utils.h @@ -31,6 +31,12 @@ typedef struct ExtendedOpNodeProperties bool onlyPushableWindowFunctions; bool pullUpIntermediateRows; bool pushDownGroupingAndHaving; + + /* indicates whether the MultiExtendedOp has a GROUP BY */ + bool hasGroupBy; + + /* indicates whether the MultiExtendedOp has an aggregate on the target list */ + bool hasAggregate; } ExtendedOpNodeProperties; diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 6fc42f8a2..9e6167959 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -176,5 +176,6 @@ extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryL Query *query, Oid *relationId, Var **column); extern char * WorkerColumnName(AttrNumber resno); extern bool IsGroupBySubsetOfDistinct(List *groupClauses, List *distinctClauses); +extern bool TargetListHasAggregates(List *targetEntryList); #endif /* MULTI_LOGICAL_OPTIMIZER_H */ diff --git a/src/test/regress/expected/aggregate_support.out b/src/test/regress/expected/aggregate_support.out index 0a30d061c..eacf43a32 100644 --- a/src/test/regress/expected/aggregate_support.out +++ b/src/test/regress/expected/aggregate_support.out @@ -486,5 +486,126 @@ select pg_catalog.coord_combine_agg('avg(float8)'::regprocedure, ARRAY[id,id,id] (1 row) +-- Test that we don't crash with empty resultset +-- See https://github.com/citusdata/citus/issues/3953 +CREATE TABLE t1 (a int PRIMARY KEY, b int); +CREATE TABLE t2 (a int PRIMARY KEY, b int); +SELECT create_distributed_table('t1','a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT 'foo' as foo, count(distinct b) FROM t1; + foo | count +--------------------------------------------------------------------- + foo | 0 +(1 row) + +SELECT 'foo' as foo, count(distinct b) FROM t2; + foo | count +--------------------------------------------------------------------- + foo | 0 +(1 row) + +SELECT 'foo' as foo, string_agg(distinct a::character varying, ',') FROM t1; + foo | string_agg +--------------------------------------------------------------------- + foo | +(1 row) + +SELECT 'foo' as foo, string_agg(distinct a::character varying, ',') FROM t2; + foo | string_agg +--------------------------------------------------------------------- + foo | +(1 row) + +CREATE OR REPLACE FUNCTION const_function(int) +RETURNS int STABLE +LANGUAGE plpgsql +AS $function$ +BEGIN +RAISE NOTICE 'stable_fn called'; +RETURN 1; +END; +$function$; +CREATE OR REPLACE FUNCTION square_func_stable(int) +RETURNS int STABLE +LANGUAGE plpgsql +AS $function$ +BEGIN +RETURN $1 * $1; +END; +$function$; +CREATE OR REPLACE FUNCTION square_func(int) +RETURNS int +LANGUAGE plpgsql +AS $function$ +BEGIN +RETURN $1 * $1; +END; +$function$; +SELECT const_function(1), string_agg(a::character, ',') FROM t1; +NOTICE: stable_fn called +CONTEXT: PL/pgSQL function const_function(integer) line 3 at RAISE + const_function | string_agg +--------------------------------------------------------------------- + 1 | +(1 row) + +SELECT const_function(1), count(b) FROM t1; +NOTICE: stable_fn called +CONTEXT: PL/pgSQL function const_function(integer) line 3 at RAISE + const_function | count +--------------------------------------------------------------------- + 1 | 0 +(1 row) + +SELECT const_function(1), count(b), 10 FROM t1; +NOTICE: stable_fn called +CONTEXT: PL/pgSQL function const_function(integer) line 3 at RAISE + const_function | count | ?column? +--------------------------------------------------------------------- + 1 | 0 | 10 +(1 row) + +SELECT const_function(1), count(b), const_function(10) FROM t1; +NOTICE: stable_fn called +CONTEXT: PL/pgSQL function const_function(integer) line 3 at RAISE +NOTICE: stable_fn called +CONTEXT: PL/pgSQL function const_function(integer) line 3 at RAISE + const_function | count | const_function +--------------------------------------------------------------------- + 1 | 0 | 1 +(1 row) + +SELECT square_func(5), string_agg(a::character, ','),const_function(1) FROM t1; +NOTICE: stable_fn called +CONTEXT: PL/pgSQL function const_function(integer) line 3 at RAISE + square_func | string_agg | const_function +--------------------------------------------------------------------- + 25 | | 1 +(1 row) + +SELECT square_func_stable(5), string_agg(a::character, ','),const_function(1) FROM t1; +NOTICE: stable_fn called +CONTEXT: PL/pgSQL function const_function(integer) line 3 at RAISE + square_func_stable | string_agg | const_function +--------------------------------------------------------------------- + 25 | | 1 +(1 row) + +-- this will error since the expression will be +-- pushed down (group by) and the function doesn't exist on workers +SELECT square_func(5), a FROM t1 GROUP BY a; +ERROR: function aggregate_support.square_func(integer) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +CONTEXT: while executing command on localhost:xxxxx +-- this will error since it has group by even though there is an aggregation +-- the expression will be pushed down. +SELECT square_func(5), a, count(a) FROM t1 GROUP BY a; +ERROR: function aggregate_support.square_func(integer) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +CONTEXT: while executing command on localhost:xxxxx set client_min_messages to error; drop schema aggregate_support cascade; diff --git a/src/test/regress/sql/aggregate_support.sql b/src/test/regress/sql/aggregate_support.sql index bce20d014..2267a53b6 100644 --- a/src/test/regress/sql/aggregate_support.sql +++ b/src/test/regress/sql/aggregate_support.sql @@ -229,5 +229,60 @@ select pg_catalog.worker_partial_agg('sum(int)'::regprocedure, id) from nulltabl select pg_catalog.coord_combine_agg('sum(float8)'::regprocedure, id::text::cstring, null::float8) from nulltable; select pg_catalog.coord_combine_agg('avg(float8)'::regprocedure, ARRAY[id,id,id]::text::cstring, null::float8) from nulltable; + +-- Test that we don't crash with empty resultset +-- See https://github.com/citusdata/citus/issues/3953 +CREATE TABLE t1 (a int PRIMARY KEY, b int); +CREATE TABLE t2 (a int PRIMARY KEY, b int); +SELECT create_distributed_table('t1','a'); +SELECT 'foo' as foo, count(distinct b) FROM t1; +SELECT 'foo' as foo, count(distinct b) FROM t2; +SELECT 'foo' as foo, string_agg(distinct a::character varying, ',') FROM t1; +SELECT 'foo' as foo, string_agg(distinct a::character varying, ',') FROM t2; + + +CREATE OR REPLACE FUNCTION const_function(int) +RETURNS int STABLE +LANGUAGE plpgsql +AS $function$ +BEGIN +RAISE NOTICE 'stable_fn called'; +RETURN 1; +END; +$function$; + +CREATE OR REPLACE FUNCTION square_func_stable(int) +RETURNS int STABLE +LANGUAGE plpgsql +AS $function$ +BEGIN +RETURN $1 * $1; +END; +$function$; + +CREATE OR REPLACE FUNCTION square_func(int) +RETURNS int +LANGUAGE plpgsql +AS $function$ +BEGIN +RETURN $1 * $1; +END; +$function$; + +SELECT const_function(1), string_agg(a::character, ',') FROM t1; +SELECT const_function(1), count(b) FROM t1; +SELECT const_function(1), count(b), 10 FROM t1; +SELECT const_function(1), count(b), const_function(10) FROM t1; +SELECT square_func(5), string_agg(a::character, ','),const_function(1) FROM t1; +SELECT square_func_stable(5), string_agg(a::character, ','),const_function(1) FROM t1; + +-- this will error since the expression will be +-- pushed down (group by) and the function doesn't exist on workers +SELECT square_func(5), a FROM t1 GROUP BY a; +-- this will error since it has group by even though there is an aggregation +-- the expression will be pushed down. +SELECT square_func(5), a, count(a) FROM t1 GROUP BY a; + + set client_min_messages to error; drop schema aggregate_support cascade;