Distribute custom aggregates with multiple arguments (#4047)

Enable custom aggregates with multiple parameters to be executed on workers.

#2921 introduces distributed execution of custom aggregates. One of the limitations of this feature is that only aggregate functions with a single aggregation parameter can be pushed to worker nodes. Aim of this change is to remove that limitation and support handling of multi-parameter aggregates.

Resolves: #3997
See also: #2921
pull/3966/head
Benjamin Satzger 2020-07-24 15:16:00 -07:00 committed by GitHub
parent c73563c340
commit a35a15a513
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 862 additions and 93 deletions

View File

@ -3298,21 +3298,53 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
Const *aggOidParam = makeConst(REGPROCEDUREOID, -1, InvalidOid, sizeof(Oid),
ObjectIdGetDatum(originalAggregate->aggfnoid),
false, true);
List *aggArguments = list_make1(makeTargetEntry((Expr *) aggOidParam, 1, NULL,
false));
TargetEntry *arg = NULL;
foreach_ptr(arg, originalAggregate->args)
List *newWorkerAggregateArgs =
list_make1(makeTargetEntry((Expr *) aggOidParam, 1, NULL, false));
if (list_length(originalAggregate->args) == 1)
{
TargetEntry *newArg = copyObject(arg);
/*
* Single argument case, append 'arg' to worker_partial_agg(agg, arg).
* We don't wrap single argument in a row expression because
* it has performance implications to unwrap arguments on each
* SFUNC invocation.
*/
TargetEntry *newArg =
copyObject((TargetEntry *) linitial(originalAggregate->args));
newArg->resno++;
aggArguments = lappend(aggArguments, newArg);
newWorkerAggregateArgs = lappend(newWorkerAggregateArgs, newArg);
}
else
{
/*
* Aggregation on workers assumes a single aggregation parameter.
* To still be able to handle multiple parameters, we combine
* parameters into a single row expression, i.e., append 'ROW(...args)'
* to worker_partial_agg(agg, ROW(...args)).
*/
RowExpr *rowExpr = makeNode(RowExpr);
rowExpr->row_typeid = RECORDOID;
rowExpr->row_format = COERCE_EXPLICIT_CALL;
rowExpr->location = -1;
rowExpr->colnames = NIL;
TargetEntry *arg = NULL;
foreach_ptr(arg, originalAggregate->args)
{
rowExpr->args = lappend(rowExpr->args, copyObject(arg->expr));
}
newWorkerAggregateArgs =
lappend(newWorkerAggregateArgs,
makeTargetEntry((Expr *) rowExpr, 2, NULL, false));
}
/* worker_partial_agg(agg, ...args) */
/* worker_partial_agg(agg, arg) or worker_partial_agg(agg, ROW(...args)) */
Aggref *newWorkerAggregate = copyObject(originalAggregate);
newWorkerAggregate->aggfnoid = workerPartialId;
newWorkerAggregate->aggtype = CSTRINGOID;
newWorkerAggregate->args = aggArguments;
newWorkerAggregate->args = newWorkerAggregateArgs;
newWorkerAggregate->aggkind = AGGKIND_NORMAL;
newWorkerAggregate->aggtranstype = INTERNALOID;
newWorkerAggregate->aggargtypes = lcons_oid(OIDOID,
@ -3485,7 +3517,7 @@ static bool
AggregateEnabledCustom(Aggref *aggregateExpression)
{
if (aggregateExpression->aggorder != NIL ||
list_length(aggregateExpression->args) != 1)
list_length(aggregateExpression->args) == 0)
{
return false;
}

View File

@ -27,6 +27,7 @@
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "pg_config_manual.h"
@ -36,6 +37,26 @@ PG_FUNCTION_INFO_V1(worker_partial_agg_ffunc);
PG_FUNCTION_INFO_V1(coord_combine_agg_sfunc);
PG_FUNCTION_INFO_V1(coord_combine_agg_ffunc);
/*
* Holds information describing the structure of aggregation arguments
* and helps to efficiently handle both a single argument and multiple
* arguments wrapped in a tuple/record. It exploits the fact that
* aggregation argument types do not change between subsequent
* calls to SFUNC.
*/
typedef struct AggregationArgumentContext
{
/* immutable fields */
int argumentCount;
bool isTuple;
TupleDesc tupleDesc;
/* mutable fields */
HeapTuple tuple;
Datum *values;
bool *nulls;
} AggregationArgumentContext;
/*
* internal type for support aggregates to pass transition state alongside
* aggregation bookkeeping
@ -49,6 +70,7 @@ typedef struct StypeBox
bool transtypeByVal;
bool valueNull;
bool valueInit;
AggregationArgumentContext *aggregationArgumentContext;
} StypeBox;
static HeapTuple GetAggregateForm(Oid oid, Form_pg_aggregate *form);
@ -57,9 +79,16 @@ static HeapTuple GetTypeForm(Oid oid, Form_pg_type *form);
static void * pallocInAggContext(FunctionCallInfo fcinfo, size_t size);
static void aclcheckAggregate(ObjectType objectType, Oid userOid, Oid funcOid);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
static void InitializeStypeBox(FunctionCallInfo fcinfo, StypeBox *box, HeapTuple aggTuple,
Oid transtype);
static void InitializeStypeBox(FunctionCallInfo fcinfo, StypeBox *box,
HeapTuple aggTuple, Oid transtype,
AggregationArgumentContext *aggregationArgumentContext);
static StypeBox * TryCreateStypeBoxFromFcinfoAggref(FunctionCallInfo fcinfo);
static AggregationArgumentContext * CreateAggregationArgumentContext(FunctionCallInfo
fcinfo,
int argumentIndex);
static void ExtractAggregationValues(FunctionCallInfo fcinfo, int argumentIndex,
AggregationArgumentContext
*aggregationArgumentContext);
static void HandleTransition(StypeBox *box, FunctionCallInfo fcinfo,
FunctionCallInfo innerFcinfo);
static void HandleStrictUninit(StypeBox *box, FunctionCallInfo fcinfo, Datum value);
@ -174,7 +203,7 @@ GetAggInitVal(Datum textInitVal, Oid transtype)
*/
static void
InitializeStypeBox(FunctionCallInfo fcinfo, StypeBox *box, HeapTuple aggTuple, Oid
transtype)
transtype, AggregationArgumentContext *aggregationArgumentContext)
{
Form_pg_aggregate aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
Oid userId = GetUserId();
@ -192,6 +221,7 @@ InitializeStypeBox(FunctionCallInfo fcinfo, StypeBox *box, HeapTuple aggTuple, O
&box->valueNull);
box->transtype = transtype;
box->valueInit = !box->valueNull;
box->aggregationArgumentContext = aggregationArgumentContext;
if (box->valueNull)
{
box->value = (Datum) 0;
@ -244,13 +274,138 @@ TryCreateStypeBoxFromFcinfoAggref(FunctionCallInfo fcinfo)
StypeBox *box = pallocInAggContext(fcinfo, sizeof(StypeBox));
box->agg = DatumGetObjectId(aggConst->constvalue);
HeapTuple aggTuple = GetAggregateForm(box->agg, &aggform);
InitializeStypeBox(fcinfo, box, aggTuple, aggform->aggtranstype);
InitializeStypeBox(fcinfo, box, aggTuple, aggform->aggtranstype, NULL);
ReleaseSysCache(aggTuple);
return box;
}
/*
* CreateAggregationArgumentContext creates an AggregationArgumentContext tailored
* to handling the aggregation of input arguments identical to type at
* 'argumentIndex' in 'fcinfo'.
*/
static AggregationArgumentContext *
CreateAggregationArgumentContext(FunctionCallInfo fcinfo, int argumentIndex)
{
AggregationArgumentContext *aggregationArgumentContext =
pallocInAggContext(fcinfo, sizeof(AggregationArgumentContext));
/* check if input comes combined into tuple/record */
if (RECORDOID == get_fn_expr_argtype(fcinfo->flinfo, argumentIndex))
{
/* initialize context to handle aggregation argument combined into tuple */
if (fcGetArgNull(fcinfo, argumentIndex))
{
ereport(ERROR, (errmsg("worker_partial_agg_sfunc: null record input"),
errhint("Elements of record may be null")));
}
/* retrieve tuple header */
HeapTupleHeader tupleHeader = PG_GETARG_HEAPTUPLEHEADER(argumentIndex);
/* extract type info from the tuple */
TupleDesc tupleDesc =
lookup_rowtype_tupdesc(HeapTupleHeaderGetTypeId(tupleHeader),
HeapTupleHeaderGetTypMod(tupleHeader));
/* create a copy we can keep */
TupleDesc tupleDescCopy = pallocInAggContext(fcinfo, TupleDescSize(tupleDesc));
TupleDescCopy(tupleDescCopy, tupleDesc);
ReleaseTupleDesc(tupleDesc);
/* build a HeapTuple control structure */
HeapTuple tuple = pallocInAggContext(fcinfo, sizeof(HeapTupleData));
ItemPointerSetInvalid(&(tuple->t_self));
tuple->t_tableOid = InvalidOid;
/* initialize context to handle multiple aggregation arguments */
aggregationArgumentContext->argumentCount = tupleDescCopy->natts;
aggregationArgumentContext->values =
pallocInAggContext(fcinfo, tupleDescCopy->natts * sizeof(Datum));
aggregationArgumentContext->nulls =
pallocInAggContext(fcinfo, tupleDescCopy->natts * sizeof(bool));
aggregationArgumentContext->isTuple = true;
aggregationArgumentContext->tupleDesc = tupleDescCopy;
aggregationArgumentContext->tuple = tuple;
}
else
{
/* initialize context to handle single aggregation argument */
aggregationArgumentContext->argumentCount = 1;
aggregationArgumentContext->values = pallocInAggContext(fcinfo, sizeof(Datum));
aggregationArgumentContext->nulls = pallocInAggContext(fcinfo, sizeof(bool));
aggregationArgumentContext->isTuple = false;
aggregationArgumentContext->tupleDesc = NULL;
aggregationArgumentContext->tuple = NULL;
}
return aggregationArgumentContext;
}
/*
* ExtractAggregationValues extracts aggregation argument values and stores them in
* the mutable fields of AggregationArgumentContext.
*/
static void
ExtractAggregationValues(FunctionCallInfo fcinfo, int argumentIndex,
AggregationArgumentContext *aggregationArgumentContext)
{
if (aggregationArgumentContext->isTuple)
{
if (fcGetArgNull(fcinfo, argumentIndex))
{
/* handle null record input */
for (int i = 0; i < aggregationArgumentContext->argumentCount; i++)
{
aggregationArgumentContext->values[i] = 0;
aggregationArgumentContext->nulls[i] = true;
}
}
else
{
/* handle tuple/record input */
HeapTupleHeader tupleHeader =
DatumGetHeapTupleHeader(fcGetArgValue(fcinfo, argumentIndex));
if (HeapTupleHeaderGetNatts(tupleHeader) !=
aggregationArgumentContext->argumentCount ||
HeapTupleHeaderGetTypeId(tupleHeader) !=
aggregationArgumentContext->tupleDesc->tdtypeid ||
HeapTupleHeaderGetTypMod(tupleHeader) !=
aggregationArgumentContext->tupleDesc->tdtypmod)
{
ereport(ERROR, (errmsg("worker_partial_agg_sfunc received "
"incompatible record")));
}
aggregationArgumentContext->tuple->t_len =
HeapTupleHeaderGetDatumLength(tupleHeader);
aggregationArgumentContext->tuple->t_data = tupleHeader;
/* break down the tuple into fields */
heap_deform_tuple(
aggregationArgumentContext->tuple,
aggregationArgumentContext->tupleDesc,
aggregationArgumentContext->values,
aggregationArgumentContext->nulls);
}
}
else
{
/* extract single argument value */
aggregationArgumentContext->values[0] = fcGetArgValue(fcinfo, argumentIndex);
aggregationArgumentContext->nulls[0] = fcGetArgNull(fcinfo, argumentIndex);
}
}
/*
* HandleTransition copies logic used in nodeAgg's advance_transition_function
* for handling result of transition function.
@ -348,13 +503,19 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
if (initialCall)
{
if (PG_ARGISNULL(1))
{
ereport(ERROR, (errmsg("worker_partial_agg_sfunc received invalid null "
"input for second argument")));
}
box = pallocInAggContext(fcinfo, sizeof(StypeBox));
box->agg = PG_GETARG_OID(1);
box->aggregationArgumentContext = CreateAggregationArgumentContext(fcinfo, 2);
if (!TypecheckWorkerPartialAggArgType(fcinfo, box))
{
ereport(ERROR, (errmsg(
"worker_partial_agg_sfunc could not confirm type correctness")));
ereport(ERROR, (errmsg("worker_partial_agg_sfunc could not confirm type "
"correctness")));
}
}
else
@ -365,9 +526,11 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
HeapTuple aggtuple = GetAggregateForm(box->agg, &aggform);
Oid aggsfunc = aggform->aggtransfn;
if (initialCall)
{
InitializeStypeBox(fcinfo, box, aggtuple, aggform->aggtranstype);
InitializeStypeBox(fcinfo, box, aggtuple, aggform->aggtranstype,
box->aggregationArgumentContext);
}
ReleaseSysCache(aggtuple);
if (initialCall)
@ -377,12 +540,20 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
&box->transtypeByVal);
}
/*
* Get aggregation values, which may be either wrapped in a
* tuple (multi-argument case) or a singular, unwrapped value.
*/
ExtractAggregationValues(fcinfo, 2, box->aggregationArgumentContext);
fmgr_info(aggsfunc, &info);
if (info.fn_strict)
{
for (argumentIndex = 2; argumentIndex < PG_NARGS(); argumentIndex++)
for (argumentIndex = 0;
argumentIndex < box->aggregationArgumentContext->argumentCount;
argumentIndex++)
{
if (PG_ARGISNULL(argumentIndex))
if (box->aggregationArgumentContext->nulls[argumentIndex])
{
PG_RETURN_POINTER(box);
}
@ -390,7 +561,13 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
if (!box->valueInit)
{
HandleStrictUninit(box, fcinfo, PG_GETARG_DATUM(2));
/* For 'strict' transition functions, if the initial state value is null
* then the first argument value of the first row with all-nonnull input
* values replaces the state value.
*/
Datum stateValue = box->aggregationArgumentContext->values[0];
HandleStrictUninit(box, fcinfo, stateValue);
PG_RETURN_POINTER(box);
}
@ -400,13 +577,21 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
}
}
InitFunctionCallInfoData(*innerFcinfo, &info, fcinfo->nargs - 1, fcinfo->fncollation,
/* if aggregate function has N parameters, corresponding SFUNC has N+1 */
InitFunctionCallInfoData(*innerFcinfo, &info,
box->aggregationArgumentContext->argumentCount + 1,
fcinfo->fncollation,
fcinfo->context, fcinfo->resultinfo);
fcSetArgExt(innerFcinfo, 0, box->value, box->valueNull);
for (argumentIndex = 1; argumentIndex < innerFcinfo->nargs; argumentIndex++)
for (argumentIndex = 0;
argumentIndex < box->aggregationArgumentContext->argumentCount;
argumentIndex++)
{
fcSetArgExt(innerFcinfo, argumentIndex, fcGetArgValue(fcinfo, argumentIndex + 1),
fcGetArgNull(fcinfo, argumentIndex + 1));
fcSetArgExt(innerFcinfo, argumentIndex + 1,
box->aggregationArgumentContext->values[argumentIndex],
box->aggregationArgumentContext->nulls[argumentIndex]);
}
HandleTransition(box, fcinfo, innerFcinfo);
@ -528,7 +713,7 @@ coord_combine_agg_sfunc(PG_FUNCTION_ARGS)
if (PG_ARGISNULL(0))
{
InitializeStypeBox(fcinfo, box, aggtuple, aggform->aggtranstype);
InitializeStypeBox(fcinfo, box, aggtuple, aggform->aggtranstype, NULL);
}
ReleaseSysCache(aggtuple);
@ -703,20 +888,49 @@ TypecheckWorkerPartialAggArgType(FunctionCallInfo fcinfo, StypeBox *box)
Assert(!argtypesNull);
ReleaseSysCache(proctuple);
if (ARR_NDIM(DatumGetArrayTypeP(argtypes)) != 1 ||
ARR_DIMS(DatumGetArrayTypeP(argtypes))[0] != 1)
if (ARR_NDIM(DatumGetArrayTypeP(argtypes)) != 1)
{
elog(ERROR, "worker_partial_agg_sfunc cannot type check aggregates "
"taking anything other than 1 argument");
"taking multi-dimensional arguments");
}
int arrayIndex = 0;
Datum argtype = array_get_element(argtypes,
1, &arrayIndex, -1, sizeof(Oid), true, 'i',
&argtypesNull);
Assert(!argtypesNull);
int aggregateArgCount = ARR_DIMS(DatumGetArrayTypeP(argtypes))[0];
return aggarg != NULL && exprType((Node *) aggarg->expr) == DatumGetObjectId(argtype);
/* we expect aggregate function to have at least a single parameter */
if (box->aggregationArgumentContext->argumentCount != aggregateArgCount)
{
return false;
}
int aggregateArgIndex = 0;
Datum argType;
if (box->aggregationArgumentContext->isTuple)
{
/* check if record element types match aggregate input parameters */
for (aggregateArgIndex = 0; aggregateArgIndex < aggregateArgCount;
aggregateArgIndex++)
{
argType = array_get_element(argtypes, 1, &aggregateArgIndex, -1, sizeof(Oid),
true, 'i', &argtypesNull);
Assert(!argtypesNull);
TupleDesc tupleDesc = box->aggregationArgumentContext->tupleDesc;
if (argType != tupleDesc->attrs[aggregateArgIndex].atttypid)
{
return false;
}
}
return true;
}
else
{
argType = array_get_element(argtypes, 1, &aggregateArgIndex, -1, sizeof(Oid),
true, 'i', &argtypesNull);
Assert(!argtypesNull);
return exprType((Node *) aggarg->expr) == DatumGetObjectId(argType);
}
}

View File

@ -51,6 +51,63 @@ select create_distributed_function('sum2_strict(int)');
(1 row)
-- user-defined aggregates with multiple-parameters
create function psum_sfunc(s int, x int, y int)
returns int immutable language plpgsql as $$
begin return coalesce(s,0) + coalesce(x*y+3,1);
end;
$$;
create function psum_sfunc_strict(s int, x int, y int)
returns int immutable strict language plpgsql as $$
begin return coalesce(s,0) + coalesce(x*y+3,1);
end;
$$;
create function psum_combinefunc(s1 int, s2 int)
returns int immutable language plpgsql as $$
begin return coalesce(s1,0) + coalesce(s2,0);
end;
$$;
create function psum_combinefunc_strict(s1 int, s2 int)
returns int immutable strict language plpgsql as $$
begin return coalesce(s1,0) + coalesce(s2,0);
end;
$$;
create function psum_finalfunc(x int)
returns int immutable language plpgsql as $$
begin return x * 2;
end;
$$;
create function psum_finalfunc_strict(x int)
returns int immutable strict language plpgsql as $$
begin return x * 2;
end;
$$;
create aggregate psum(int, int)(
sfunc=psum_sfunc,
combinefunc=psum_combinefunc,
finalfunc=psum_finalfunc,
stype=int
);
create aggregate psum_strict(int, int)(
sfunc=psum_sfunc_strict,
combinefunc=psum_combinefunc_strict,
finalfunc=psum_finalfunc_strict,
stype=int,
initcond=0
);
select create_distributed_function('psum(int,int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
select create_distributed_function('psum_strict(int,int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- generate test data
create table aggdata (id int, key int, val int, valf float8);
select create_distributed_table('aggdata', 'id');
create_distributed_table
@ -59,16 +116,16 @@ select create_distributed_table('aggdata', 'id');
(1 row)
insert into aggdata (id, key, val, valf) values (1, 1, 2, 11.2), (2, 1, NULL, 2.1), (3, 2, 2, 3.22), (4, 2, 3, 4.23), (5, 2, 5, 5.25), (6, 3, 4, 63.4), (7, 5, NULL, 75), (8, 6, NULL, NULL), (9, 6, NULL, 96), (10, 7, 8, 1078), (11, 9, 0, 1.19);
select key, sum2(val), sum2_strict(val), stddev(valf) from aggdata group by key order by key;
key | sum2 | sum2_strict | stddev
select key, sum2(val), sum2_strict(val), stddev(valf)::numeric(10,5), psum(val, valf::int), psum_strict(val, valf::int) from aggdata group by key order by key;
key | sum2 | sum2_strict | stddev | psum | psum_strict
---------------------------------------------------------------------
1 | | 4 | 6.43467170879758
2 | 20 | 20 | 1.01500410508201
3 | 8 | 8 |
5 | | |
6 | | |
7 | 16 | 16 |
9 | 0 | 0 |
1 | | 4 | 6.43467 | 52 | 50
2 | 20 | 20 | 1.01500 | 104 | 104
3 | 8 | 8 | | 510 | 510
5 | | | | 2 | 0
6 | | | | 4 | 0
7 | 16 | 16 | | 17254 | 17254
9 | 0 | 0 | | 6 | 6
(7 rows)
-- FILTER supported
@ -85,46 +142,332 @@ select key, sum2(val) filter (where valf < 5), sum2_strict(val) filter (where va
(7 rows)
-- DISTINCT unsupported, unless grouped by partition key
select key, sum2(distinct val), sum2_strict(distinct val) from aggdata group by key order by key;
select key, sum2(distinct val), sum2_strict(distinct val), psum(distinct val, valf::int), psum_strict(distinct val, valf::int) from aggdata group by key order by key;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
select id, sum2(distinct val), sum2_strict(distinct val) from aggdata group by id order by id;
id | sum2 | sum2_strict
select id, sum2(distinct val), sum2_strict(distinct val), psum(distinct val, valf::int), psum_strict(distinct val, valf::int) from aggdata group by id order by id;
id | sum2 | sum2_strict | psum | psum_strict
---------------------------------------------------------------------
1 | 4 | 4
2 | |
3 | 4 | 4
4 | 6 | 6
5 | 10 | 10
6 | 8 | 8
7 | |
8 | |
9 | |
10 | 16 | 16
11 | 0 | 0
1 | 4 | 4 | 50 | 50
2 | | | 2 | 0
3 | 4 | 4 | 18 | 18
4 | 6 | 6 | 30 | 30
5 | 10 | 10 | 56 | 56
6 | 8 | 8 | 510 | 510
7 | | | 2 | 0
8 | | | 2 | 0
9 | | | 2 | 0
10 | 16 | 16 | 17254 | 17254
11 | 0 | 0 | 6 | 6
(11 rows)
-- ORDER BY unsupported
select key, sum2(val order by valf), sum2_strict(val order by valf) from aggdata group by key order by key;
select key, sum2(val order by valf), sum2_strict(val order by valf), psum(val, valf::int order by valf), psum_strict(val, valf::int order by valf) from aggdata group by key order by key;
ERROR: unsupported aggregate function sum2
-- Test handling a lack of intermediate results
select sum2(val), sum2_strict(val) from aggdata where valf = 0;
sum2 | sum2_strict
select sum2(val), sum2_strict(val), psum(val, valf::int), psum_strict(val, valf::int) from aggdata where valf = 0;
sum2 | sum2_strict | psum | psum_strict
---------------------------------------------------------------------
0 |
0 | | 0 | 0
(1 row)
-- Test HAVING
select key, stddev(valf) from aggdata group by key having stddev(valf) > 2 order by key;
key | stddev
select key, stddev(valf)::numeric(10,5) from aggdata group by key having stddev(valf) > 2 order by key;
key | stddev
---------------------------------------------------------------------
1 | 6.43467170879758
1 | 6.43467
(1 row)
select key, stddev(valf) from aggdata group by key having stddev(val::float8) > 1 order by key;
key | stddev
select key, stddev(valf)::numeric(10,5) from aggdata group by key having stddev(val::float8) > 1 order by key;
key | stddev
---------------------------------------------------------------------
2 | 1.01500410508201
2 | 1.01500
(1 row)
select key, corr(valf,valf+val)::numeric(10,5) from aggdata group by key having corr(valf,valf+val) < 1 order by key;
key | corr
---------------------------------------------------------------------
2 | 0.99367
(1 row)
select key, corr(valf,valf+val)::numeric(10,5) from aggdata group by key having corr(valf::float8,valf+val) < 1 order by key;
key | corr
---------------------------------------------------------------------
2 | 0.99367
(1 row)
-- Previously aggregated on master, pushed down to workers with multi-parameter support
select floor(val/2), corr(valf, valf + val)::numeric(10,5) from aggdata group by floor(val/2) order by 1;
floor | corr
---------------------------------------------------------------------
0 |
1 | 0.99181
2 | 1.00000
4 |
|
(5 rows)
select floor(val/2), corr(valf, valf + val)::numeric(10,5) from aggdata group by floor(val/2) having corr(valf + val, val) < 1 order by 1;
floor | corr
---------------------------------------------------------------------
1 | 0.99181
2 | 1.00000
(2 rows)
-- built-in binary aggregates for statistics
select regr_count(valf,val)::numeric(10,5)from aggdata;
regr_count
---------------------------------------------------------------------
7.00000
(1 row)
select regr_sxx(valf,val)::numeric(10,5) from aggdata;
regr_sxx
---------------------------------------------------------------------
39.71429
(1 row)
select regr_syy(valf,val)::numeric(10,3) from aggdata;
regr_syy
---------------------------------------------------------------------
971900.680
(1 row)
select regr_sxy(valf,val)::numeric(10,5) from aggdata;
regr_sxy
---------------------------------------------------------------------
4945.98571
(1 row)
select regr_avgx(valf,val)::numeric(10,5), regr_avgy(valf,val)::numeric(10,5) from aggdata;
regr_avgx | regr_avgy
---------------------------------------------------------------------
3.42857 | 166.64143
(1 row)
select regr_r2(valf,val)::numeric(10,5) from aggdata;
regr_r2
---------------------------------------------------------------------
0.63378
(1 row)
select regr_slope(valf,val)::numeric(10,5), regr_intercept(valf,val)::numeric(10,5) from aggdata;
regr_slope | regr_intercept
---------------------------------------------------------------------
124.53921 | -260.35014
(1 row)
select covar_pop(valf,val)::numeric(10,5), covar_samp(valf,val)::numeric(10,5) from aggdata;
covar_pop | covar_samp
---------------------------------------------------------------------
706.56939 | 824.33095
(1 row)
-- binary string aggregation
create function binstragg_sfunc(s text, e1 text, e2 text)
returns text immutable language plpgsql as $$
begin case when coalesce(e1,'') > coalesce(s,'') and coalesce(e1,'') > coalesce(e2,'') then return e1;
when coalesce(e2,'') > coalesce(s,'') and coalesce(e2,'') > coalesce(e1,'') then return e2;
else return s;
end case;
end;
$$;
create function binstragg_combinefunc(s1 text, s2 text)
returns text immutable language plpgsql as $$
begin if coalesce(s1,'') > coalesce(s2,'') then return s1; else return s2; end if;
end;
$$;
create aggregate binstragg(text, text)(
sfunc=binstragg_sfunc,
combinefunc=binstragg_combinefunc,
stype=text
);
select create_distributed_function('binstragg(text,text)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
create table txttbl(id int, col1 text, col2 text);
select create_distributed_table('txttbl', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
insert into txttbl values (1, 'aaaa', 'bbbb'), (2, 'cccc', 'dddd'), (3, 'eeee', 'ffff'), (4, 'gggg', 'hhhh'), (5, 'iiii', 'jjjj'), (6, 'kkkk', 'llll'), (7, 'mmmm', 'nnnn'), (8, 'oooo', 'pppp'), (9, 'qqqq', 'rrrr'), (10, 'ssss', 'tttt'), (11, 'uuuu', 'vvvv'), (12, 'wwww', 'xxxx'), (13, 'yyyy', 'zzzz');
select binstragg(col1, col2) from txttbl;
binstragg
---------------------------------------------------------------------
zzzz
(1 row)
create table users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
select create_distributed_table('users_table', 'user_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
create table events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint);
select create_distributed_table('events_table', 'user_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
insert into users_table select i % 10000, timestamp '2014-01-10 20:00:00' +
i * (timestamp '2014-01-20 20:00:00' -
timestamp '2014-01-10 10:00:00'),i, i % 100, i % 5 from generate_series(0, 1000) i;
insert into events_table select i % 10000, timestamp '2014-01-10 20:00:00' +
i * (timestamp '2014-01-20 20:00:00' -
timestamp '2014-01-10 10:00:00'),i, i % 100, i % 5 from generate_series(0, 10000) i;
-- query with window functions, the agg. inside the window functions
select value_3, to_char(date_trunc('day', time), 'YYYYMMDD') as time, rank() over my_win as my_rank
from events_table
group by
value_3, date_trunc('day', time)
WINDOW my_win as (partition by regr_syy(event_type%10, value_2)::int order by count(*) desc)
order by 1,2,3
limit 5;
value_3 | time | my_rank
---------------------------------------------------------------------
0 | 20140110 | 1
0 | 20140303 | 1
0 | 20140425 | 1
0 | 20140616 | 1
0 | 20140807 | 1
(5 rows)
-- query with window functions, the agg. outside the window functions
select regr_syy(event_type%10, value_2)::int, value_3, to_char(date_trunc('day', time), 'YYYYMMDD') as time, rank() over my_win as my_rank
from events_table
group by
value_3, date_trunc('day', time)
WINDOW my_win as (partition by value_3 order by count(*) desc)
order by 1,2,3
limit 5;
regr_syy | value_3 | time | my_rank
---------------------------------------------------------------------
0 | 0 | 20140110 | 1
0 | 0 | 20140303 | 1
0 | 0 | 20140425 | 1
0 | 0 | 20140616 | 1
0 | 0 | 20140807 | 1
(5 rows)
-- query with only order by
select regr_syy(event_type%10, value_2)::int
from events_table
order by 1 desc;
regr_syy
---------------------------------------------------------------------
82520
(1 row)
-- query with group by + target list + order by
select count(*), regr_syy(event_type%10, value_2)::int
from events_table
group by value_3
order by 2 desc;
count | regr_syy
---------------------------------------------------------------------
2001 | 12506
2000 | 12500
2000 | 12500
2000 | 12500
2000 | 12500
(5 rows)
-- query with group by + order by
select count(*)
from events_table
group by value_3
order by regr_syy(event_type%10, value_2)::int desc;
count
---------------------------------------------------------------------
2001
2000
2000
2000
2000
(5 rows)
-- query with basic join
select regr_syy(u1.user_id, u2.user_id)::int
from users_table u1, events_table u2
where u1.user_id = u2.user_id;
regr_syy
---------------------------------------------------------------------
83833250
(1 row)
-- agg. with filter with columns
select regr_syy(u1.user_id, u2.user_id) filter (where u1.value_1 < 5)::numeric(10,3)
from users_table u1, events_table u2
where u1.user_id = u2.user_id;
regr_syy
---------------------------------------------------------------------
13.333
(1 row)
-- agg with filter and group by
select regr_syy(u1.user_id, u2.user_id) filter (where (u1.value_1) < 5)::numeric(10,3)
from users_table u1, events_table u2
where u1.user_id = u2.user_id
group by u1.value_3;
regr_syy
---------------------------------------------------------------------
0.000
0.000
0.000
0.000
0.000
(5 rows)
-- agg. with filter with consts
select regr_syy(u1.user_id, u2.user_id) filter (where '0300030' LIKE '%3%')::int
from users_table u1, events_table u2
where u1.user_id = u2.user_id;
regr_syy
---------------------------------------------------------------------
83833250
(1 row)
-- multiple aggs with filters
select regr_syy(u1.user_id, u2.user_id) filter (where u1.value_1 < 5)::numeric(10,3), regr_syy(u1.value_1, u2.value_2) filter (where u1.user_id < 5)::numeric(10,3)
from users_table u1, events_table u2
where u1.user_id = u2.user_id;
regr_syy | regr_syy
---------------------------------------------------------------------
13.333 | 13.333
(1 row)
-- query with where false
select regr_syy(u1.user_id, u2.user_id) filter (where u1.value_1 < 5)::numeric(10,3), regr_syy(u1.value_1, u2.value_2) filter (where u1.user_id < 5)::numeric(10,3)
from users_table u1, events_table u2
where 1=0;
regr_syy | regr_syy
---------------------------------------------------------------------
|
(1 row)
-- a CTE forced to be planned recursively (via OFFSET 0)
with cte_1 as
(
select
regr_syy(u1.user_id, u2.user_id) filter (where u1.value_1 < 5)::numeric(10,3), regr_syy(u1.value_1, u2.value_2) filter (where u1.user_id < 5)::numeric(10,3)
from users_table u1, events_table u2
where u1.user_id = u2.user_id
OFFSET 0
)
select
*
from
cte_1;
regr_syy | regr_syy
---------------------------------------------------------------------
13.333 | 13.333
(1 row)
-- Test https://github.com/citusdata/citus/issues/3446
@ -363,23 +706,6 @@ select key, percentile_cont(key/10.0) within group(order by val) from aggdata gr
9 | 0
(7 rows)
select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) order by 1;
floor | corr
---------------------------------------------------------------------
0 |
1 | 0.991808518376741
2 | 1
4 |
|
(5 rows)
select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) having corr(valf + val, val) < 1 order by 1;
floor | corr
---------------------------------------------------------------------
1 | 0.991808518376741
2 | 1
(2 rows)
select array_agg(val order by valf) from aggdata;
array_agg
---------------------------------------------------------------------
@ -459,7 +785,7 @@ create table nulltable(id int);
insert into nulltable values (0);
-- These cases are not type correct
select pg_catalog.worker_partial_agg('string_agg(text,text)'::regprocedure, id) from nulltable;
ERROR: worker_partial_agg_sfunc cannot type check aggregates taking anything other than 1 argument
ERROR: worker_partial_agg_sfunc could not confirm type correctness
select pg_catalog.worker_partial_agg('sum(int8)'::regprocedure, id) from nulltable;
ERROR: worker_partial_agg_sfunc could not confirm type correctness
select pg_catalog.coord_combine_agg('sum(float8)'::regprocedure, id::text::cstring, null::text) from nulltable;

View File

@ -50,23 +50,222 @@ create aggregate sum2_strict (int) (
select create_distributed_function('sum2(int)');
select create_distributed_function('sum2_strict(int)');
-- user-defined aggregates with multiple-parameters
create function psum_sfunc(s int, x int, y int)
returns int immutable language plpgsql as $$
begin return coalesce(s,0) + coalesce(x*y+3,1);
end;
$$;
create function psum_sfunc_strict(s int, x int, y int)
returns int immutable strict language plpgsql as $$
begin return coalesce(s,0) + coalesce(x*y+3,1);
end;
$$;
create function psum_combinefunc(s1 int, s2 int)
returns int immutable language plpgsql as $$
begin return coalesce(s1,0) + coalesce(s2,0);
end;
$$;
create function psum_combinefunc_strict(s1 int, s2 int)
returns int immutable strict language plpgsql as $$
begin return coalesce(s1,0) + coalesce(s2,0);
end;
$$;
create function psum_finalfunc(x int)
returns int immutable language plpgsql as $$
begin return x * 2;
end;
$$;
create function psum_finalfunc_strict(x int)
returns int immutable strict language plpgsql as $$
begin return x * 2;
end;
$$;
create aggregate psum(int, int)(
sfunc=psum_sfunc,
combinefunc=psum_combinefunc,
finalfunc=psum_finalfunc,
stype=int
);
create aggregate psum_strict(int, int)(
sfunc=psum_sfunc_strict,
combinefunc=psum_combinefunc_strict,
finalfunc=psum_finalfunc_strict,
stype=int,
initcond=0
);
select create_distributed_function('psum(int,int)');
select create_distributed_function('psum_strict(int,int)');
-- generate test data
create table aggdata (id int, key int, val int, valf float8);
select create_distributed_table('aggdata', 'id');
insert into aggdata (id, key, val, valf) values (1, 1, 2, 11.2), (2, 1, NULL, 2.1), (3, 2, 2, 3.22), (4, 2, 3, 4.23), (5, 2, 5, 5.25), (6, 3, 4, 63.4), (7, 5, NULL, 75), (8, 6, NULL, NULL), (9, 6, NULL, 96), (10, 7, 8, 1078), (11, 9, 0, 1.19);
select key, sum2(val), sum2_strict(val), stddev(valf) from aggdata group by key order by key;
select key, sum2(val), sum2_strict(val), stddev(valf)::numeric(10,5), psum(val, valf::int), psum_strict(val, valf::int) from aggdata group by key order by key;
-- FILTER supported
select key, sum2(val) filter (where valf < 5), sum2_strict(val) filter (where valf < 5) from aggdata group by key order by key;
-- DISTINCT unsupported, unless grouped by partition key
select key, sum2(distinct val), sum2_strict(distinct val) from aggdata group by key order by key;
select id, sum2(distinct val), sum2_strict(distinct val) from aggdata group by id order by id;
select key, sum2(distinct val), sum2_strict(distinct val), psum(distinct val, valf::int), psum_strict(distinct val, valf::int) from aggdata group by key order by key;
select id, sum2(distinct val), sum2_strict(distinct val), psum(distinct val, valf::int), psum_strict(distinct val, valf::int) from aggdata group by id order by id;
-- ORDER BY unsupported
select key, sum2(val order by valf), sum2_strict(val order by valf) from aggdata group by key order by key;
select key, sum2(val order by valf), sum2_strict(val order by valf), psum(val, valf::int order by valf), psum_strict(val, valf::int order by valf) from aggdata group by key order by key;
-- Test handling a lack of intermediate results
select sum2(val), sum2_strict(val) from aggdata where valf = 0;
select sum2(val), sum2_strict(val), psum(val, valf::int), psum_strict(val, valf::int) from aggdata where valf = 0;
-- Test HAVING
select key, stddev(valf) from aggdata group by key having stddev(valf) > 2 order by key;
select key, stddev(valf) from aggdata group by key having stddev(val::float8) > 1 order by key;
select key, stddev(valf)::numeric(10,5) from aggdata group by key having stddev(valf) > 2 order by key;
select key, stddev(valf)::numeric(10,5) from aggdata group by key having stddev(val::float8) > 1 order by key;
select key, corr(valf,valf+val)::numeric(10,5) from aggdata group by key having corr(valf,valf+val) < 1 order by key;
select key, corr(valf,valf+val)::numeric(10,5) from aggdata group by key having corr(valf::float8,valf+val) < 1 order by key;
-- Previously aggregated on master, pushed down to workers with multi-parameter support
select floor(val/2), corr(valf, valf + val)::numeric(10,5) from aggdata group by floor(val/2) order by 1;
select floor(val/2), corr(valf, valf + val)::numeric(10,5) from aggdata group by floor(val/2) having corr(valf + val, val) < 1 order by 1;
-- built-in binary aggregates for statistics
select regr_count(valf,val)::numeric(10,5)from aggdata;
select regr_sxx(valf,val)::numeric(10,5) from aggdata;
select regr_syy(valf,val)::numeric(10,3) from aggdata;
select regr_sxy(valf,val)::numeric(10,5) from aggdata;
select regr_avgx(valf,val)::numeric(10,5), regr_avgy(valf,val)::numeric(10,5) from aggdata;
select regr_r2(valf,val)::numeric(10,5) from aggdata;
select regr_slope(valf,val)::numeric(10,5), regr_intercept(valf,val)::numeric(10,5) from aggdata;
select covar_pop(valf,val)::numeric(10,5), covar_samp(valf,val)::numeric(10,5) from aggdata;
-- binary string aggregation
create function binstragg_sfunc(s text, e1 text, e2 text)
returns text immutable language plpgsql as $$
begin case when coalesce(e1,'') > coalesce(s,'') and coalesce(e1,'') > coalesce(e2,'') then return e1;
when coalesce(e2,'') > coalesce(s,'') and coalesce(e2,'') > coalesce(e1,'') then return e2;
else return s;
end case;
end;
$$;
create function binstragg_combinefunc(s1 text, s2 text)
returns text immutable language plpgsql as $$
begin if coalesce(s1,'') > coalesce(s2,'') then return s1; else return s2; end if;
end;
$$;
create aggregate binstragg(text, text)(
sfunc=binstragg_sfunc,
combinefunc=binstragg_combinefunc,
stype=text
);
select create_distributed_function('binstragg(text,text)');
create table txttbl(id int, col1 text, col2 text);
select create_distributed_table('txttbl', 'id');
insert into txttbl values (1, 'aaaa', 'bbbb'), (2, 'cccc', 'dddd'), (3, 'eeee', 'ffff'), (4, 'gggg', 'hhhh'), (5, 'iiii', 'jjjj'), (6, 'kkkk', 'llll'), (7, 'mmmm', 'nnnn'), (8, 'oooo', 'pppp'), (9, 'qqqq', 'rrrr'), (10, 'ssss', 'tttt'), (11, 'uuuu', 'vvvv'), (12, 'wwww', 'xxxx'), (13, 'yyyy', 'zzzz');
select binstragg(col1, col2) from txttbl;
create table users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
select create_distributed_table('users_table', 'user_id');
create table events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint);
select create_distributed_table('events_table', 'user_id');
insert into users_table select i % 10000, timestamp '2014-01-10 20:00:00' +
i * (timestamp '2014-01-20 20:00:00' -
timestamp '2014-01-10 10:00:00'),i, i % 100, i % 5 from generate_series(0, 1000) i;
insert into events_table select i % 10000, timestamp '2014-01-10 20:00:00' +
i * (timestamp '2014-01-20 20:00:00' -
timestamp '2014-01-10 10:00:00'),i, i % 100, i % 5 from generate_series(0, 10000) i;
-- query with window functions, the agg. inside the window functions
select value_3, to_char(date_trunc('day', time), 'YYYYMMDD') as time, rank() over my_win as my_rank
from events_table
group by
value_3, date_trunc('day', time)
WINDOW my_win as (partition by regr_syy(event_type%10, value_2)::int order by count(*) desc)
order by 1,2,3
limit 5;
-- query with window functions, the agg. outside the window functions
select regr_syy(event_type%10, value_2)::int, value_3, to_char(date_trunc('day', time), 'YYYYMMDD') as time, rank() over my_win as my_rank
from events_table
group by
value_3, date_trunc('day', time)
WINDOW my_win as (partition by value_3 order by count(*) desc)
order by 1,2,3
limit 5;
-- query with only order by
select regr_syy(event_type%10, value_2)::int
from events_table
order by 1 desc;
-- query with group by + target list + order by
select count(*), regr_syy(event_type%10, value_2)::int
from events_table
group by value_3
order by 2 desc;
-- query with group by + order by
select count(*)
from events_table
group by value_3
order by regr_syy(event_type%10, value_2)::int desc;
-- query with basic join
select regr_syy(u1.user_id, u2.user_id)::int
from users_table u1, events_table u2
where u1.user_id = u2.user_id;
-- agg. with filter with columns
select regr_syy(u1.user_id, u2.user_id) filter (where u1.value_1 < 5)::numeric(10,3)
from users_table u1, events_table u2
where u1.user_id = u2.user_id;
-- agg with filter and group by
select regr_syy(u1.user_id, u2.user_id) filter (where (u1.value_1) < 5)::numeric(10,3)
from users_table u1, events_table u2
where u1.user_id = u2.user_id
group by u1.value_3;
-- agg. with filter with consts
select regr_syy(u1.user_id, u2.user_id) filter (where '0300030' LIKE '%3%')::int
from users_table u1, events_table u2
where u1.user_id = u2.user_id;
-- multiple aggs with filters
select regr_syy(u1.user_id, u2.user_id) filter (where u1.value_1 < 5)::numeric(10,3), regr_syy(u1.value_1, u2.value_2) filter (where u1.user_id < 5)::numeric(10,3)
from users_table u1, events_table u2
where u1.user_id = u2.user_id;
-- query with where false
select regr_syy(u1.user_id, u2.user_id) filter (where u1.value_1 < 5)::numeric(10,3), regr_syy(u1.value_1, u2.value_2) filter (where u1.user_id < 5)::numeric(10,3)
from users_table u1, events_table u2
where 1=0;
-- a CTE forced to be planned recursively (via OFFSET 0)
with cte_1 as
(
select
regr_syy(u1.user_id, u2.user_id) filter (where u1.value_1 < 5)::numeric(10,3), regr_syy(u1.value_1, u2.value_2) filter (where u1.user_id < 5)::numeric(10,3)
from users_table u1, events_table u2
where u1.user_id = u2.user_id
OFFSET 0
)
select
*
from
cte_1;
-- Test https://github.com/citusdata/citus/issues/3446
set citus.coordinator_aggregation_strategy to 'row-gather';
@ -175,8 +374,6 @@ select string_agg(distinct floor(val/2)::text, '|' order by floor(val/2)::text)
select mode() within group (order by floor(val/2)) from aggdata;
select percentile_cont(0.5) within group(order by valf) from aggdata;
select key, percentile_cont(key/10.0) within group(order by val) from aggdata group by key;
select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) order by 1;
select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) having corr(valf + val, val) < 1 order by 1;
select array_agg(val order by valf) from aggdata;
-- Test TransformSubqueryNode