diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index e692c5bee..de95a36dc 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -274,6 +274,7 @@ static List * WorkerAggregateExpressionList(Aggref *originalAggregate, WorkerAggregateWalkerContext *walkerContextry); static AggregateType GetAggregateType(Aggref *aggregatExpression); static Oid AggregateArgumentType(Aggref *aggregate); +static Expr * FirstAggregateArgument(Aggref *aggregate); static bool AggregateEnabledCustom(Aggref *aggregateExpression); static Oid CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *argtypes); static Oid WorkerPartialAggOid(void); @@ -2028,6 +2029,12 @@ MasterAggregateExpression(Aggref *originalAggregate, Oid aggregateFunctionId = AggregateFunctionOid(aggregateName, workerReturnType); Oid masterReturnType = get_func_rettype(aggregateFunctionId); + Aggref *newMasterAggregate = copyObject(originalAggregate); + newMasterAggregate->aggdistinct = NULL; + newMasterAggregate->aggfnoid = aggregateFunctionId; + newMasterAggregate->aggtype = masterReturnType; + newMasterAggregate->aggfilter = NULL; + /* * If return type aggregate is anyelement, its actual return type is * determined on the type of its argument. So we replace it with the @@ -2035,13 +2042,11 @@ MasterAggregateExpression(Aggref *originalAggregate, */ if (masterReturnType == ANYELEMENTOID) { - masterReturnType = workerReturnType; + newMasterAggregate->aggtype = workerReturnType; + + Expr *firstArg = FirstAggregateArgument(originalAggregate); + newMasterAggregate->aggcollid = exprCollation((Node *) firstArg); } - Aggref *newMasterAggregate = copyObject(originalAggregate); - newMasterAggregate->aggdistinct = NULL; - newMasterAggregate->aggfnoid = aggregateFunctionId; - newMasterAggregate->aggtype = masterReturnType; - newMasterAggregate->aggfilter = NULL; Var *column = makeVar(masterTableId, walkerContext->columnId, workerReturnType, workerReturnTypeMod, workerCollationId, columnLevelsUp); @@ -3213,6 +3218,22 @@ AggregateArgumentType(Aggref *aggregate) } +/* + * FirstAggregateArgument returns the first argument of the aggregate. + */ +static Expr * +FirstAggregateArgument(Aggref *aggregate) +{ + List *argumentList = aggregate->args; + + Assert(list_length(argumentList) >= 1); + + TargetEntry *argument = (TargetEntry *) linitial(argumentList); + + return argument->expr; +} + + /* * AggregateEnabledCustom returns whether given aggregate can be * distributed across workers using worker_partial_agg & coord_combine_agg. diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 9b9a9de7d..00d4908d7 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -215,7 +215,6 @@ static StringInfo IntermediateTableQueryString(uint64 jobId, uint32 taskIdIndex, static uint32 FinalTargetEntryCount(List *targetEntryList); static bool CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval); -static Node * AddAnyValueAggregates(Node *node, void *context); /* @@ -974,7 +973,7 @@ TargetEntryList(List *expressionList) * function. This is needed for repartition joins because primary keys are not * present on intermediate tables. */ -static Node * +Node * AddAnyValueAggregates(Node *node, void *context) { List *groupClauseList = context; @@ -994,6 +993,7 @@ AddAnyValueAggregates(Node *node, void *context) agg->aggtranstype = InvalidOid; agg->aggargtypes = list_make1_oid(var->vartype); agg->aggsplit = AGGSPLIT_SIMPLE; + agg->aggcollid = exprCollation((Node *) var); return (Node *) agg; } if (IsA(node, TargetEntry)) diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index bda86407a..ba2854fc3 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -1636,6 +1636,27 @@ SubqueryPushdownMultiNodeTree(Query *originalQuery) (Node *) make_ands_implicit((Expr *) extendedOpNode->havingQual); } + /* + * Group by on primary key allows all columns to appear in the target + * list, but once we wrap the join tree into a subquery the GROUP BY + * will no longer directly refer to the primary key and referencing + * columns that are not in the GROUP BY would result in an error. To + * prevent that we wrap all the columns that do not appear in the + * GROUP BY in an any_value aggregate. + */ + if (extendedOpNode->groupClauseList != NIL) + { + extendedOpNode->targetList = + (List *) expression_tree_mutator((Node *) extendedOpNode->targetList, + AddAnyValueAggregates, + extendedOpNode->groupClauseList); + + extendedOpNode->havingQual = + expression_tree_mutator((Node *) extendedOpNode->havingQual, + AddAnyValueAggregates, + extendedOpNode->groupClauseList); + } + /* * Postgres standard planner evaluates expressions in the LIMIT/OFFSET clauses. * Since we're using original query here, we should manually evaluate the diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 84112d93b..2f2a65acc 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -453,6 +453,7 @@ extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryString); extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber); +extern Node * AddAnyValueAggregates(Node *node, void *context); /* * Function declarations for building, updating constraints and simple operator diff --git a/src/test/regress/expected/subquery_complex_target_list.out b/src/test/regress/expected/subquery_complex_target_list.out index 55152b68d..dfb88976e 100644 --- a/src/test/regress/expected/subquery_complex_target_list.out +++ b/src/test/regress/expected/subquery_complex_target_list.out @@ -415,5 +415,91 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT event_type, c COMMIT; SET client_min_messages TO DEFAULT; +CREATE TABLE items (key text primary key, value text not null, t timestamp); +SELECT create_distributed_table('items','key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO items VALUES ('key-1','value-2', '2020-01-01 00:00'); +INSERT INTO items VALUES ('key-2','value-1', '2020-02-02 00:00'); +CREATE TABLE other_items (key text primary key, value text not null); +SELECT create_distributed_table('other_items','key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO other_items VALUES ('key-1','value-2'); +-- LEFT JOINs are wrapped into a subquery under the covers, which causes GROUP BY +-- to be separated from the LEFT JOIN. If the GROUP BY is on a primary key we can +-- normally use any column even ones that are not in the GROUP BY, but not when +-- it is in the outer query. In that case, we use the any_value aggregate. +SELECT key, a.value, count(b.value), t +FROM items a LEFT JOIN other_items b USING (key) +GROUP BY key HAVING a.value != 'value-2' ORDER BY count(b.value), a.value LIMIT 5; + key | value | count | t +--------------------------------------------------------------------- + key-2 | value-1 | 0 | Sun Feb 02 00:00:00 2020 +(1 row) + +SELECT key, a.value, count(b.value), t +FROM items a LEFT JOIN other_items b USING (key) +GROUP BY key, t HAVING a.value != 'value-2' ORDER BY count(b.value), a.value LIMIT 5; + key | value | count | t +--------------------------------------------------------------------- + key-2 | value-1 | 0 | Sun Feb 02 00:00:00 2020 +(1 row) + +-- make sure the same logic works for regular joins +SELECT key, a.value, count(b.value), t +FROM items a JOIN other_items b USING (key) +GROUP BY key HAVING a.value = 'value-2' ORDER BY count(b.value), a.value LIMIT 5; + key | value | count | t +--------------------------------------------------------------------- + key-1 | value-2 | 1 | Wed Jan 01 00:00:00 2020 +(1 row) + +-- subqueries also trigger wrapping +SELECT key, a.value, count(b.value), t +FROM items a JOIN (SELECT key, value, random() FROM other_items) b USING (key) +GROUP BY key ORDER BY 3, 2, 1; + key | value | count | t +--------------------------------------------------------------------- + key-1 | value-2 | 1 | Wed Jan 01 00:00:00 2020 +(1 row) + +-- pushdownable window functions also trigger wrapping +SELECT a.key, a.value, count(a.value) OVER (PARTITION BY a.key) +FROM items a JOIN other_items b ON (a.key = b.key) +GROUP BY a.key ORDER BY 3, 2, 1; + key | value | count +--------------------------------------------------------------------- + key-1 | value-2 | 1 +(1 row) + +-- left join with non-pushdownable window functions +SELECT a.key, a.value, count(a.value) OVER () +FROM items a LEFT JOIN other_items b ON (a.key = b.key) +GROUP BY a.key ORDER BY 3, 2, 1; + key | value | count +--------------------------------------------------------------------- + key-2 | value-1 | 2 + key-1 | value-2 | 2 +(2 rows) + +-- function joins (actually with read_intermediate_results) also trigger wrapping +SELECT key, a.value, sum(b) +FROM items a JOIN generate_series(1,10) b ON (a.key = 'key-'||b) +GROUP BY key ORDER BY 3, 2, 1; + key | value | sum +--------------------------------------------------------------------- + key-1 | value-2 | 1 + key-2 | value-1 | 2 +(2 rows) + DROP SCHEMA subquery_complex CASCADE; -SET search_path TO public; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table items +drop cascades to table other_items diff --git a/src/test/regress/sql/subquery_complex_target_list.sql b/src/test/regress/sql/subquery_complex_target_list.sql index 777984e9b..ae56b5b1e 100644 --- a/src/test/regress/sql/subquery_complex_target_list.sql +++ b/src/test/regress/sql/subquery_complex_target_list.sql @@ -300,5 +300,50 @@ COMMIT; SET client_min_messages TO DEFAULT; +CREATE TABLE items (key text primary key, value text not null, t timestamp); +SELECT create_distributed_table('items','key'); +INSERT INTO items VALUES ('key-1','value-2', '2020-01-01 00:00'); +INSERT INTO items VALUES ('key-2','value-1', '2020-02-02 00:00'); + +CREATE TABLE other_items (key text primary key, value text not null); +SELECT create_distributed_table('other_items','key'); +INSERT INTO other_items VALUES ('key-1','value-2'); + +-- LEFT JOINs are wrapped into a subquery under the covers, which causes GROUP BY +-- to be separated from the LEFT JOIN. If the GROUP BY is on a primary key we can +-- normally use any column even ones that are not in the GROUP BY, but not when +-- it is in the outer query. In that case, we use the any_value aggregate. +SELECT key, a.value, count(b.value), t +FROM items a LEFT JOIN other_items b USING (key) +GROUP BY key HAVING a.value != 'value-2' ORDER BY count(b.value), a.value LIMIT 5; + +SELECT key, a.value, count(b.value), t +FROM items a LEFT JOIN other_items b USING (key) +GROUP BY key, t HAVING a.value != 'value-2' ORDER BY count(b.value), a.value LIMIT 5; + +-- make sure the same logic works for regular joins +SELECT key, a.value, count(b.value), t +FROM items a JOIN other_items b USING (key) +GROUP BY key HAVING a.value = 'value-2' ORDER BY count(b.value), a.value LIMIT 5; + +-- subqueries also trigger wrapping +SELECT key, a.value, count(b.value), t +FROM items a JOIN (SELECT key, value, random() FROM other_items) b USING (key) +GROUP BY key ORDER BY 3, 2, 1; + +-- pushdownable window functions also trigger wrapping +SELECT a.key, a.value, count(a.value) OVER (PARTITION BY a.key) +FROM items a JOIN other_items b ON (a.key = b.key) +GROUP BY a.key ORDER BY 3, 2, 1; + +-- left join with non-pushdownable window functions +SELECT a.key, a.value, count(a.value) OVER () +FROM items a LEFT JOIN other_items b ON (a.key = b.key) +GROUP BY a.key ORDER BY 3, 2, 1; + +-- function joins (actually with read_intermediate_results) also trigger wrapping +SELECT key, a.value, sum(b) +FROM items a JOIN generate_series(1,10) b ON (a.key = 'key-'||b) +GROUP BY key ORDER BY 3, 2, 1; + DROP SCHEMA subquery_complex CASCADE; -SET search_path TO public;