mirror of https://github.com/citusdata/citus.git
Fix error when using LEFT JOIN with GROUP BY on primary key
parent
e1802c5c00
commit
331b45348c
|
@ -274,6 +274,7 @@ static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
|
||||||
WorkerAggregateWalkerContext *walkerContextry);
|
WorkerAggregateWalkerContext *walkerContextry);
|
||||||
static AggregateType GetAggregateType(Aggref *aggregatExpression);
|
static AggregateType GetAggregateType(Aggref *aggregatExpression);
|
||||||
static Oid AggregateArgumentType(Aggref *aggregate);
|
static Oid AggregateArgumentType(Aggref *aggregate);
|
||||||
|
static Expr * FirstAggregateArgument(Aggref *aggregate);
|
||||||
static bool AggregateEnabledCustom(Aggref *aggregateExpression);
|
static bool AggregateEnabledCustom(Aggref *aggregateExpression);
|
||||||
static Oid CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *argtypes);
|
static Oid CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *argtypes);
|
||||||
static Oid WorkerPartialAggOid(void);
|
static Oid WorkerPartialAggOid(void);
|
||||||
|
@ -2028,6 +2029,12 @@ MasterAggregateExpression(Aggref *originalAggregate,
|
||||||
Oid aggregateFunctionId = AggregateFunctionOid(aggregateName, workerReturnType);
|
Oid aggregateFunctionId = AggregateFunctionOid(aggregateName, workerReturnType);
|
||||||
Oid masterReturnType = get_func_rettype(aggregateFunctionId);
|
Oid masterReturnType = get_func_rettype(aggregateFunctionId);
|
||||||
|
|
||||||
|
Aggref *newMasterAggregate = copyObject(originalAggregate);
|
||||||
|
newMasterAggregate->aggdistinct = NULL;
|
||||||
|
newMasterAggregate->aggfnoid = aggregateFunctionId;
|
||||||
|
newMasterAggregate->aggtype = masterReturnType;
|
||||||
|
newMasterAggregate->aggfilter = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If return type aggregate is anyelement, its actual return type is
|
* If return type aggregate is anyelement, its actual return type is
|
||||||
* determined on the type of its argument. So we replace it with the
|
* determined on the type of its argument. So we replace it with the
|
||||||
|
@ -2035,13 +2042,11 @@ MasterAggregateExpression(Aggref *originalAggregate,
|
||||||
*/
|
*/
|
||||||
if (masterReturnType == ANYELEMENTOID)
|
if (masterReturnType == ANYELEMENTOID)
|
||||||
{
|
{
|
||||||
masterReturnType = workerReturnType;
|
newMasterAggregate->aggtype = workerReturnType;
|
||||||
|
|
||||||
|
Expr *firstArg = FirstAggregateArgument(originalAggregate);
|
||||||
|
newMasterAggregate->aggcollid = exprCollation((Node *) firstArg);
|
||||||
}
|
}
|
||||||
Aggref *newMasterAggregate = copyObject(originalAggregate);
|
|
||||||
newMasterAggregate->aggdistinct = NULL;
|
|
||||||
newMasterAggregate->aggfnoid = aggregateFunctionId;
|
|
||||||
newMasterAggregate->aggtype = masterReturnType;
|
|
||||||
newMasterAggregate->aggfilter = NULL;
|
|
||||||
|
|
||||||
Var *column = makeVar(masterTableId, walkerContext->columnId, workerReturnType,
|
Var *column = makeVar(masterTableId, walkerContext->columnId, workerReturnType,
|
||||||
workerReturnTypeMod, workerCollationId, columnLevelsUp);
|
workerReturnTypeMod, workerCollationId, columnLevelsUp);
|
||||||
|
@ -3213,6 +3218,22 @@ AggregateArgumentType(Aggref *aggregate)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FirstAggregateArgument returns the first argument of the aggregate.
|
||||||
|
*/
|
||||||
|
static Expr *
|
||||||
|
FirstAggregateArgument(Aggref *aggregate)
|
||||||
|
{
|
||||||
|
List *argumentList = aggregate->args;
|
||||||
|
|
||||||
|
Assert(list_length(argumentList) >= 1);
|
||||||
|
|
||||||
|
TargetEntry *argument = (TargetEntry *) linitial(argumentList);
|
||||||
|
|
||||||
|
return argument->expr;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AggregateEnabledCustom returns whether given aggregate can be
|
* AggregateEnabledCustom returns whether given aggregate can be
|
||||||
* distributed across workers using worker_partial_agg & coord_combine_agg.
|
* distributed across workers using worker_partial_agg & coord_combine_agg.
|
||||||
|
|
|
@ -215,7 +215,6 @@ static StringInfo IntermediateTableQueryString(uint64 jobId, uint32 taskIdIndex,
|
||||||
static uint32 FinalTargetEntryCount(List *targetEntryList);
|
static uint32 FinalTargetEntryCount(List *targetEntryList);
|
||||||
static bool CoPlacedShardIntervals(ShardInterval *firstInterval,
|
static bool CoPlacedShardIntervals(ShardInterval *firstInterval,
|
||||||
ShardInterval *secondInterval);
|
ShardInterval *secondInterval);
|
||||||
static Node * AddAnyValueAggregates(Node *node, void *context);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -974,7 +973,7 @@ TargetEntryList(List *expressionList)
|
||||||
* function. This is needed for repartition joins because primary keys are not
|
* function. This is needed for repartition joins because primary keys are not
|
||||||
* present on intermediate tables.
|
* present on intermediate tables.
|
||||||
*/
|
*/
|
||||||
static Node *
|
Node *
|
||||||
AddAnyValueAggregates(Node *node, void *context)
|
AddAnyValueAggregates(Node *node, void *context)
|
||||||
{
|
{
|
||||||
List *groupClauseList = context;
|
List *groupClauseList = context;
|
||||||
|
@ -994,6 +993,7 @@ AddAnyValueAggregates(Node *node, void *context)
|
||||||
agg->aggtranstype = InvalidOid;
|
agg->aggtranstype = InvalidOid;
|
||||||
agg->aggargtypes = list_make1_oid(var->vartype);
|
agg->aggargtypes = list_make1_oid(var->vartype);
|
||||||
agg->aggsplit = AGGSPLIT_SIMPLE;
|
agg->aggsplit = AGGSPLIT_SIMPLE;
|
||||||
|
agg->aggcollid = exprCollation((Node *) var);
|
||||||
return (Node *) agg;
|
return (Node *) agg;
|
||||||
}
|
}
|
||||||
if (IsA(node, TargetEntry))
|
if (IsA(node, TargetEntry))
|
||||||
|
|
|
@ -1636,6 +1636,27 @@ SubqueryPushdownMultiNodeTree(Query *originalQuery)
|
||||||
(Node *) make_ands_implicit((Expr *) extendedOpNode->havingQual);
|
(Node *) make_ands_implicit((Expr *) extendedOpNode->havingQual);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Group by on primary key allows all columns to appear in the target
|
||||||
|
* list, but once we wrap the join tree into a subquery the GROUP BY
|
||||||
|
* will no longer directly refer to the primary key and referencing
|
||||||
|
* columns that are not in the GROUP BY would result in an error. To
|
||||||
|
* prevent that we wrap all the columns that do not appear in the
|
||||||
|
* GROUP BY in an any_value aggregate.
|
||||||
|
*/
|
||||||
|
if (extendedOpNode->groupClauseList != NIL)
|
||||||
|
{
|
||||||
|
extendedOpNode->targetList =
|
||||||
|
(List *) expression_tree_mutator((Node *) extendedOpNode->targetList,
|
||||||
|
AddAnyValueAggregates,
|
||||||
|
extendedOpNode->groupClauseList);
|
||||||
|
|
||||||
|
extendedOpNode->havingQual =
|
||||||
|
expression_tree_mutator((Node *) extendedOpNode->havingQual,
|
||||||
|
AddAnyValueAggregates,
|
||||||
|
extendedOpNode->groupClauseList);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Postgres standard planner evaluates expressions in the LIMIT/OFFSET clauses.
|
* Postgres standard planner evaluates expressions in the LIMIT/OFFSET clauses.
|
||||||
* Since we're using original query here, we should manually evaluate the
|
* Since we're using original query here, we should manually evaluate the
|
||||||
|
|
|
@ -453,6 +453,7 @@ extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
|
||||||
char *queryString);
|
char *queryString);
|
||||||
|
|
||||||
extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
|
extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
|
||||||
|
extern Node * AddAnyValueAggregates(Node *node, void *context);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Function declarations for building, updating constraints and simple operator
|
* Function declarations for building, updating constraints and simple operator
|
||||||
|
|
|
@ -415,5 +415,91 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT event_type, c
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
|
CREATE TABLE items (key text primary key, value text not null, t timestamp);
|
||||||
|
SELECT create_distributed_table('items','key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO items VALUES ('key-1','value-2', '2020-01-01 00:00');
|
||||||
|
INSERT INTO items VALUES ('key-2','value-1', '2020-02-02 00:00');
|
||||||
|
CREATE TABLE other_items (key text primary key, value text not null);
|
||||||
|
SELECT create_distributed_table('other_items','key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO other_items VALUES ('key-1','value-2');
|
||||||
|
-- LEFT JOINs are wrapped into a subquery under the covers, which causes GROUP BY
|
||||||
|
-- to be separated from the LEFT JOIN. If the GROUP BY is on a primary key we can
|
||||||
|
-- normally use any column even ones that are not in the GROUP BY, but not when
|
||||||
|
-- it is in the outer query. In that case, we use the any_value aggregate.
|
||||||
|
SELECT key, a.value, count(b.value), t
|
||||||
|
FROM items a LEFT JOIN other_items b USING (key)
|
||||||
|
GROUP BY key HAVING a.value != 'value-2' ORDER BY count(b.value), a.value LIMIT 5;
|
||||||
|
key | value | count | t
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
key-2 | value-1 | 0 | Sun Feb 02 00:00:00 2020
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT key, a.value, count(b.value), t
|
||||||
|
FROM items a LEFT JOIN other_items b USING (key)
|
||||||
|
GROUP BY key, t HAVING a.value != 'value-2' ORDER BY count(b.value), a.value LIMIT 5;
|
||||||
|
key | value | count | t
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
key-2 | value-1 | 0 | Sun Feb 02 00:00:00 2020
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- make sure the same logic works for regular joins
|
||||||
|
SELECT key, a.value, count(b.value), t
|
||||||
|
FROM items a JOIN other_items b USING (key)
|
||||||
|
GROUP BY key HAVING a.value = 'value-2' ORDER BY count(b.value), a.value LIMIT 5;
|
||||||
|
key | value | count | t
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
key-1 | value-2 | 1 | Wed Jan 01 00:00:00 2020
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- subqueries also trigger wrapping
|
||||||
|
SELECT key, a.value, count(b.value), t
|
||||||
|
FROM items a JOIN (SELECT key, value, random() FROM other_items) b USING (key)
|
||||||
|
GROUP BY key ORDER BY 3, 2, 1;
|
||||||
|
key | value | count | t
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
key-1 | value-2 | 1 | Wed Jan 01 00:00:00 2020
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- pushdownable window functions also trigger wrapping
|
||||||
|
SELECT a.key, a.value, count(a.value) OVER (PARTITION BY a.key)
|
||||||
|
FROM items a JOIN other_items b ON (a.key = b.key)
|
||||||
|
GROUP BY a.key ORDER BY 3, 2, 1;
|
||||||
|
key | value | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
key-1 | value-2 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- left join with non-pushdownable window functions
|
||||||
|
SELECT a.key, a.value, count(a.value) OVER ()
|
||||||
|
FROM items a LEFT JOIN other_items b ON (a.key = b.key)
|
||||||
|
GROUP BY a.key ORDER BY 3, 2, 1;
|
||||||
|
key | value | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
key-2 | value-1 | 2
|
||||||
|
key-1 | value-2 | 2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- function joins (actually with read_intermediate_results) also trigger wrapping
|
||||||
|
SELECT key, a.value, sum(b)
|
||||||
|
FROM items a JOIN generate_series(1,10) b ON (a.key = 'key-'||b)
|
||||||
|
GROUP BY key ORDER BY 3, 2, 1;
|
||||||
|
key | value | sum
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
key-1 | value-2 | 1
|
||||||
|
key-2 | value-1 | 2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
DROP SCHEMA subquery_complex CASCADE;
|
DROP SCHEMA subquery_complex CASCADE;
|
||||||
SET search_path TO public;
|
NOTICE: drop cascades to 2 other objects
|
||||||
|
DETAIL: drop cascades to table items
|
||||||
|
drop cascades to table other_items
|
||||||
|
|
|
@ -300,5 +300,50 @@ COMMIT;
|
||||||
|
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
|
|
||||||
|
CREATE TABLE items (key text primary key, value text not null, t timestamp);
|
||||||
|
SELECT create_distributed_table('items','key');
|
||||||
|
INSERT INTO items VALUES ('key-1','value-2', '2020-01-01 00:00');
|
||||||
|
INSERT INTO items VALUES ('key-2','value-1', '2020-02-02 00:00');
|
||||||
|
|
||||||
|
CREATE TABLE other_items (key text primary key, value text not null);
|
||||||
|
SELECT create_distributed_table('other_items','key');
|
||||||
|
INSERT INTO other_items VALUES ('key-1','value-2');
|
||||||
|
|
||||||
|
-- LEFT JOINs are wrapped into a subquery under the covers, which causes GROUP BY
|
||||||
|
-- to be separated from the LEFT JOIN. If the GROUP BY is on a primary key we can
|
||||||
|
-- normally use any column even ones that are not in the GROUP BY, but not when
|
||||||
|
-- it is in the outer query. In that case, we use the any_value aggregate.
|
||||||
|
SELECT key, a.value, count(b.value), t
|
||||||
|
FROM items a LEFT JOIN other_items b USING (key)
|
||||||
|
GROUP BY key HAVING a.value != 'value-2' ORDER BY count(b.value), a.value LIMIT 5;
|
||||||
|
|
||||||
|
SELECT key, a.value, count(b.value), t
|
||||||
|
FROM items a LEFT JOIN other_items b USING (key)
|
||||||
|
GROUP BY key, t HAVING a.value != 'value-2' ORDER BY count(b.value), a.value LIMIT 5;
|
||||||
|
|
||||||
|
-- make sure the same logic works for regular joins
|
||||||
|
SELECT key, a.value, count(b.value), t
|
||||||
|
FROM items a JOIN other_items b USING (key)
|
||||||
|
GROUP BY key HAVING a.value = 'value-2' ORDER BY count(b.value), a.value LIMIT 5;
|
||||||
|
|
||||||
|
-- subqueries also trigger wrapping
|
||||||
|
SELECT key, a.value, count(b.value), t
|
||||||
|
FROM items a JOIN (SELECT key, value, random() FROM other_items) b USING (key)
|
||||||
|
GROUP BY key ORDER BY 3, 2, 1;
|
||||||
|
|
||||||
|
-- pushdownable window functions also trigger wrapping
|
||||||
|
SELECT a.key, a.value, count(a.value) OVER (PARTITION BY a.key)
|
||||||
|
FROM items a JOIN other_items b ON (a.key = b.key)
|
||||||
|
GROUP BY a.key ORDER BY 3, 2, 1;
|
||||||
|
|
||||||
|
-- left join with non-pushdownable window functions
|
||||||
|
SELECT a.key, a.value, count(a.value) OVER ()
|
||||||
|
FROM items a LEFT JOIN other_items b ON (a.key = b.key)
|
||||||
|
GROUP BY a.key ORDER BY 3, 2, 1;
|
||||||
|
|
||||||
|
-- function joins (actually with read_intermediate_results) also trigger wrapping
|
||||||
|
SELECT key, a.value, sum(b)
|
||||||
|
FROM items a JOIN generate_series(1,10) b ON (a.key = 'key-'||b)
|
||||||
|
GROUP BY key ORDER BY 3, 2, 1;
|
||||||
|
|
||||||
DROP SCHEMA subquery_complex CASCADE;
|
DROP SCHEMA subquery_complex CASCADE;
|
||||||
SET search_path TO public;
|
|
||||||
|
|
Loading…
Reference in New Issue