Make sure we don't wrap GROUP BY expressions in any_value

pull/3814/head
Marco Slot 2020-04-30 10:41:33 +02:00
parent d4943cee55
commit 6ce2803777
5 changed files with 238 additions and 39 deletions

View File

@ -95,6 +95,24 @@ bool EnableUniqueJobIds = true;
static List *OperatorCache = NIL;
/* context passed down in AddAnyValueAggregates mutator */
typedef struct AddAnyValueAggregatesContext
{
/* SortGroupClauses corresponding to the GROUP BY clause */
List *groupClauseList;
/* TargetEntry's to which the GROUP BY clauses refer */
List *groupByTargetEntryList;
/*
* haveNonVarGrouping is true if there are expressions in the
* GROUP BY target entries. We use this as an optimisation to
* skip expensive checks when possible.
*/
bool haveNonVarGrouping;
} AddAnyValueAggregatesContext;
/* Local functions forward declarations for job creation */
static Job * BuildJobTree(MultiTreeRoot *multiTree);
static MultiNode * LeftMostNode(MultiTreeRoot *multiTree);
@ -105,6 +123,7 @@ static Query * BuildReduceQuery(MultiExtendedOp *extendedOpNode, List *dependent
static List * BaseRangeTableList(MultiNode *multiNode);
static List * QueryTargetList(MultiNode *multiNode);
static List * TargetEntryList(List *expressionList);
static Node * AddAnyValueAggregates(Node *node, AddAnyValueAggregatesContext *context);
static List * QueryGroupClauseList(MultiNode *multiNode);
static List * QuerySelectClauseList(MultiNode *multiNode);
static List * QueryJoinClauseList(MultiNode *multiNode);
@ -696,13 +715,11 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList)
*/
if (groupClauseList != NIL && isRepartitionJoin)
{
targetList = (List *) expression_tree_mutator((Node *) targetList,
AddAnyValueAggregates,
groupClauseList);
targetList = (List *) WrapUngroupedVarsInAnyValueAggregate(
(Node *) targetList, groupClauseList, targetList, true);
havingQual = expression_tree_mutator((Node *) havingQual,
AddAnyValueAggregates,
groupClauseList);
havingQual = WrapUngroupedVarsInAnyValueAggregate(
(Node *) havingQual, groupClauseList, targetList, false);
}
/*
@ -974,23 +991,116 @@ TargetEntryList(List *expressionList)
/*
* AddAnyValueAggregates wraps all vars that do not appear in the GROUP BY
* clause or are inside an aggregate function in an any_value aggregate
* function. This is needed for repartition joins because primary keys are not
* present on intermediate tables.
* WrapUngroupedVarsInAnyValueAggregate finds Var nodes in the expression
* that do not refer to any GROUP BY column and wraps them in an any_value
* aggregate. These columns are allowed when the GROUP BY is on a primary
* key of a relation, but not if we wrap the relation in a subquery.
* However, since we still know the value is unique, any_value gives the
* right result.
*/
Node *
AddAnyValueAggregates(Node *node, void *context)
WrapUngroupedVarsInAnyValueAggregate(Node *expression, List *groupClauseList,
List *targetList, bool checkExpressionEquality)
{
if (expression == NULL)
{
return NULL;
}
AddAnyValueAggregatesContext context;
context.groupClauseList = groupClauseList;
context.groupByTargetEntryList = GroupTargetEntryList(groupClauseList, targetList);
context.haveNonVarGrouping = false;
if (checkExpressionEquality)
{
/*
* If the GROUP BY contains non-Var expressions, we need to do an expensive
* subexpression equality check.
*/
TargetEntry *targetEntry = NULL;
foreach_ptr(targetEntry, context.groupByTargetEntryList)
{
if (!IsA(targetEntry->expr, Var))
{
context.haveNonVarGrouping = true;
break;
}
}
}
/* put the result in the same memory context */
MemoryContext nodeContext = GetMemoryChunkContext(expression);
MemoryContext oldContext = MemoryContextSwitchTo(nodeContext);
Node *result = expression_tree_mutator(expression, AddAnyValueAggregates,
&context);
MemoryContextSwitchTo(oldContext);
return result;
}
/*
* AddAnyValueAggregates wraps all vars that do not appear in the GROUP BY
* clause or are inside an aggregate function in an any_value aggregate
* function. This is needed because postgres allows columns that are not
* in the GROUP BY to appear on the target list as long as the primary key
* of the table is in the GROUP BY, but we sometimes wrap the join tree
* in a subquery in which case the primary key information is lost.
*
* This function copies parts of the node tree, but may contain references
* to the original node tree.
*
* The implementation is derived from / inspired by
* check_ungrouped_columns_walker.
*/
static Node *
AddAnyValueAggregates(Node *node, AddAnyValueAggregatesContext *context)
{
List *groupClauseList = context;
if (node == NULL)
{
return node;
}
if (IsA(node, Var))
if (IsA(node, Aggref) || IsA(node, GroupingFunc))
{
/* any column is allowed to appear in an aggregate or grouping */
return node;
}
else if (IsA(node, Var))
{
Var *var = (Var *) node;
/*
* Check whether this Var appears in the GROUP BY.
*/
TargetEntry *groupByTargetEntry = NULL;
foreach_ptr(groupByTargetEntry, context->groupByTargetEntryList)
{
if (!IsA(groupByTargetEntry->expr, Var))
{
continue;
}
Var *groupByVar = (Var *) groupByTargetEntry->expr;
/* we should only be doing this at the top level of the query */
Assert(groupByVar->varlevelsup == 0);
if (var->varno == groupByVar->varno &&
var->varattno == groupByVar->varattno)
{
/* this Var is in the GROUP BY, do not wrap it */
return node;
}
}
/*
* We have found a Var that does not appear in the GROUP BY.
* Wrap it in an any_value aggregate.
*/
Aggref *agg = makeNode(Aggref);
agg->aggfnoid = CitusAnyValueFunctionId();
agg->aggtype = var->vartype;
@ -1002,31 +1112,24 @@ AddAnyValueAggregates(Node *node, void *context)
agg->aggcollid = exprCollation((Node *) var);
return (Node *) agg;
}
if (IsA(node, TargetEntry))
else if (context->haveNonVarGrouping)
{
TargetEntry *targetEntry = (TargetEntry *) node;
/*
* Stop searching this part of the tree if the targetEntry is part of
* the group by clause.
* The GROUP BY contains at least one expression. Check whether the
* current expression is equal to one of the GROUP BY expressions.
* Otherwise, continue to descend into subexpressions.
*/
if (targetEntry->ressortgroupref != 0)
TargetEntry *groupByTargetEntry = NULL;
foreach_ptr(groupByTargetEntry, context->groupByTargetEntryList)
{
SortGroupClause *sortGroupClause = NULL;
foreach_ptr(sortGroupClause, groupClauseList)
if (equal(node, groupByTargetEntry->expr))
{
if (sortGroupClause->tleSortGroupRef == targetEntry->ressortgroupref)
{
return node;
}
/* do not descend into mutator, all Vars are safe */
return node;
}
}
}
if (IsA(node, Aggref) || IsA(node, GroupingFunc))
{
return node;
}
return expression_tree_mutator(node, AddAnyValueAggregates, context);
}

View File

@ -1648,15 +1648,15 @@ SubqueryPushdownMultiNodeTree(Query *originalQuery)
*/
if (extendedOpNode->groupClauseList != NIL)
{
extendedOpNode->targetList =
(List *) expression_tree_mutator((Node *) extendedOpNode->targetList,
AddAnyValueAggregates,
extendedOpNode->groupClauseList);
extendedOpNode->targetList = (List *) WrapUngroupedVarsInAnyValueAggregate(
(Node *) extendedOpNode->targetList,
extendedOpNode->groupClauseList,
extendedOpNode->targetList, true);
extendedOpNode->havingQual =
expression_tree_mutator((Node *) extendedOpNode->havingQual,
AddAnyValueAggregates,
extendedOpNode->groupClauseList);
extendedOpNode->havingQual = WrapUngroupedVarsInAnyValueAggregate(
(Node *) extendedOpNode->havingQual,
extendedOpNode->groupClauseList,
extendedOpNode->targetList, false);
}
/*

View File

@ -497,7 +497,10 @@ extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString);
extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
extern Node * AddAnyValueAggregates(Node *node, void *context);
extern Node * WrapUngroupedVarsInAnyValueAggregate(Node *expression,
List *groupClauseList,
List *targetList,
bool checkExpressionEquality);
/*
* Function declarations for building, updating constraints and simple operator

View File

@ -509,6 +509,62 @@ GROUP BY a.key ORDER BY 3, 2, 1;
key-1 | (key-1,value-2,"Wed Jan 01 00:00:00 2020") | 1
(2 rows)
-- Of the target list entries, v1-v3 should be wrapped in any_value as they do
-- not appear in GROUP BY. The append happens on the coordinator in that case.
-- Vars in the HAVING that do not appear in the GROUP BY are also wrapped.
SELECT
a.key as k1,
a.key as k2,
a.key || '_append' as k3,
a.value as v1,
a.value as v2,
a.value || '_notgrouped' as v3,
a.value || '_append' as va1,
a.value || '_append' as va2,
a.value || '_append' || '_more' as va2,
count(*)
FROM items a LEFT JOIN other_items b ON (a.key = b.key)
GROUP BY a.key, a.value ||'_append'
HAVING length(a.key) + length(a.value) < length(a.value || '_append')
ORDER BY 1;
k1 | k2 | k3 | v1 | v2 | v3 | va1 | va2 | va2 | count
---------------------------------------------------------------------
key-1 | key-1 | key-1_append | value-2 | value-2 | value-2_notgrouped | value-2_append | value-2_append | value-2_append_more | 1
key-2 | key-2 | key-2_append | value-1 | value-1 | value-1_notgrouped | value-1_append | value-1_append | value-1_append_more | 1
(2 rows)
SELECT coordinator_plan($$
EXPLAIN (VERBOSE ON, COSTS OFF)
SELECT
a.key as k1,
a.key as k2,
a.key || '_append' as k3,
a.value as v1,
a.value as v2,
a.value || '_notgrouped' as v3,
a.value || '_append' as va1,
a.value || '_append' as va2,
a.value || '_append' || '_more' as va3,
count(*)
FROM items a LEFT JOIN other_items b ON (a.key = b.key)
GROUP BY a.key, a.value ||'_append'
HAVING length(a.key) + length(a.value) < length(a.value || '_append')
ORDER BY 1
$$);
coordinator_plan
---------------------------------------------------------------------
Sort
Output: remote_scan.k1, remote_scan.k2, remote_scan.k3, (any_value(remote_scan.v1)), (any_value(remote_scan.v2)), ((any_value(remote_scan.v3) || '_notgrouped'::text)), remote_scan.va1, remote_scan.va2, remote_scan.va3, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))
Sort Key: remote_scan.k1
-> HashAggregate
Output: remote_scan.k1, remote_scan.k2, remote_scan.k3, any_value(remote_scan.v1), any_value(remote_scan.v2), (any_value(remote_scan.v3) || '_notgrouped'::text), remote_scan.va1, remote_scan.va2, remote_scan.va3, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)
Group Key: remote_scan.k1, remote_scan.va1
Filter: ((length(remote_scan.worker_column_11) + length(any_value(remote_scan.worker_column_12))) < length((any_value(remote_scan.worker_column_13) || '_append'::text)))
-> Custom Scan (Citus Adaptive)
Output: remote_scan.k1, remote_scan.k2, remote_scan.k3, remote_scan.v1, remote_scan.v2, remote_scan.v3, remote_scan.va1, remote_scan.va2, remote_scan.va3, remote_scan.count, remote_scan.worker_column_11, remote_scan.worker_column_12, remote_scan.worker_column_13
Task Count: 4
(10 rows)
SELECT a FROM items a ORDER BY key;
a
---------------------------------------------------------------------

View File

@ -351,6 +351,43 @@ SELECT a.key, a, count(b)
FROM items a LEFT JOIN other_items b ON (a.key = b.key)
GROUP BY a.key ORDER BY 3, 2, 1;
-- Of the target list entries, v1-v3 should be wrapped in any_value as they do
-- not appear in GROUP BY. The append happens on the coordinator in that case.
-- Vars in the HAVING that do not appear in the GROUP BY are also wrapped.
SELECT
a.key as k1,
a.key as k2,
a.key || '_append' as k3,
a.value as v1,
a.value as v2,
a.value || '_notgrouped' as v3,
a.value || '_append' as va1,
a.value || '_append' as va2,
a.value || '_append' || '_more' as va2,
count(*)
FROM items a LEFT JOIN other_items b ON (a.key = b.key)
GROUP BY a.key, a.value ||'_append'
HAVING length(a.key) + length(a.value) < length(a.value || '_append')
ORDER BY 1;
SELECT coordinator_plan($$
EXPLAIN (VERBOSE ON, COSTS OFF)
SELECT
a.key as k1,
a.key as k2,
a.key || '_append' as k3,
a.value as v1,
a.value as v2,
a.value || '_notgrouped' as v3,
a.value || '_append' as va1,
a.value || '_append' as va2,
a.value || '_append' || '_more' as va3,
count(*)
FROM items a LEFT JOIN other_items b ON (a.key = b.key)
GROUP BY a.key, a.value ||'_append'
HAVING length(a.key) + length(a.value) < length(a.value || '_append')
ORDER BY 1
$$);
SELECT a FROM items a ORDER BY key;
SELECT a FROM items a WHERE key = 'key-1';
SELECT a FROM (SELECT a, random() FROM items a) b ORDER BY a;