mirror of https://github.com/citusdata/citus.git
Initial custom aggregates. Very WIP. Only a branch because I like backups. Will squash someday
parent
c32bd459f4
commit
40a5e3128b
|
@ -0,0 +1,62 @@
|
||||||
|
CREATE FUNCTION stype_serialize(internal, oid, ...)serial
|
||||||
|
RETURNS internal
|
||||||
|
AS 'MODULE_PATHNAME'
|
||||||
|
LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE;
|
||||||
|
|
||||||
|
CREATE FUNCTION stype_deserialize(internal, oid, ...)serial
|
||||||
|
RETURNS internal
|
||||||
|
AS 'MODULE_PATHNAME'
|
||||||
|
LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE;
|
||||||
|
|
||||||
|
CREATE FUNCTION stype_combine(internal, oid, ...)serial
|
||||||
|
RETURNS internal
|
||||||
|
AS 'MODULE_PATHNAME'
|
||||||
|
LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE;
|
||||||
|
|
||||||
|
CREATE FUNCTION worker_partial_agg_sfunc(internal, oid, ...)
|
||||||
|
RETURNS internal
|
||||||
|
AS 'MODULE_PATHNAME'
|
||||||
|
LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE;
|
||||||
|
|
||||||
|
CREATE FUNCTION worker_partial_agg_ffunc(internal, oid, ...)
|
||||||
|
RETURNS internal
|
||||||
|
AS 'MODULE_PATHNAME'
|
||||||
|
LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE;
|
||||||
|
|
||||||
|
CREATE FUNCTION coord_combine_agg_sfunc(internal, oid, ...)
|
||||||
|
RETURNS internal
|
||||||
|
AS 'MODULE_PATHNAME'
|
||||||
|
LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE;
|
||||||
|
|
||||||
|
CREATE FUNCTION coord_combine_agg_ffunc(internal, oid, ...)
|
||||||
|
RETURNS internal
|
||||||
|
AS 'MODULE_PATHNAME'
|
||||||
|
LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE;
|
||||||
|
|
||||||
|
-- select worker_partial_agg(agg, ...)
|
||||||
|
-- equivalent to
|
||||||
|
-- select serialize_stype(agg_without_ffunc(...))
|
||||||
|
CREATE AGGREGATE worker_partial_agg(oid, ...) (
|
||||||
|
STYPE = internal,
|
||||||
|
SFUNC = worker_partial_agg_sfunc,
|
||||||
|
FINALFUNC = worker_partial_agg_ffunc,
|
||||||
|
FINALFUNC_EXTRA,
|
||||||
|
COMBINEFUNC = stypebox_combine,
|
||||||
|
SERIALFUNC = stypebox_serialize,
|
||||||
|
DESERIALFUNC = stypebox_deserialize,
|
||||||
|
PARALLEL = SAFE
|
||||||
|
)
|
||||||
|
|
||||||
|
-- select coord_combine_agg(agg, col)
|
||||||
|
-- equivalent to
|
||||||
|
-- select agg_ffunc(agg_combine(col))
|
||||||
|
CREATE AGGREGATE coord_combine_agg(oid, ...) (
|
||||||
|
STYPE = internal,
|
||||||
|
SFUNC = coord_combine_sfunc,
|
||||||
|
FINALFUNC = coord_combine_agg_ffunc,
|
||||||
|
FINALFUNC_EXTRA,
|
||||||
|
COMBINEFUNC = stypebox_combine,
|
||||||
|
SERIALFUNC = stypebox_serialize,
|
||||||
|
DESERIALFUNC = stypebox_deserialize,
|
||||||
|
PARALLEL = SAFE
|
||||||
|
)
|
|
@ -1508,8 +1508,28 @@ MasterAggregateExpression(Aggref *originalAggregate,
|
||||||
const Index columnLevelsUp = 0; /* normal column */
|
const Index columnLevelsUp = 0; /* normal column */
|
||||||
const AttrNumber argumentId = 1; /* our aggregates have single arguments */
|
const AttrNumber argumentId = 1; /* our aggregates have single arguments */
|
||||||
AggClauseCosts aggregateCosts;
|
AggClauseCosts aggregateCosts;
|
||||||
|
HeapTuple aggTuple;
|
||||||
|
Form_pg_aggregate aggform;
|
||||||
|
Oid combinefn;
|
||||||
|
Oid serialfn = InvalidOid;
|
||||||
|
Oid deserialfn = InvalidOid;
|
||||||
|
|
||||||
if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
|
aggTuple = SearchSysCache1(AGGFNOID,
|
||||||
|
ObjectIdGetDatum(originalAggregate->aggfnoid));
|
||||||
|
if (!HeapTupleIsValid(aggTuple))
|
||||||
|
elog(ERROR, "cache lookup failed for aggregate %u",
|
||||||
|
originalAggregate->aggfnoid);
|
||||||
|
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
|
||||||
|
|
||||||
|
/* planner recorded transition state type in the Aggref itself */
|
||||||
|
combinefn = aggform->aggcombinefn;
|
||||||
|
|
||||||
|
if (combinefn != InvalidOid) {
|
||||||
|
if (originalAggregate->aggtranstype == INTERNALOID) {
|
||||||
|
serialfn = aggform->aggserialfn;
|
||||||
|
deserialfn = aggform->aggdeserialfn;
|
||||||
|
}
|
||||||
|
} else if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
|
||||||
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
|
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
|
||||||
walkerContext->pullDistinctColumns)
|
walkerContext->pullDistinctColumns)
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "utils/fmgr.h"
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(stypebox_serialize);
|
||||||
|
PG_FUNCTION_INFO_V1(stypebox_deserialize);
|
||||||
|
PG_FUNCTION_INFO_V1(stypebox_combine);
|
||||||
|
PG_FUNCTION_INFO_V1(worker_partial_agg_sfunc);
|
||||||
|
PG_FUNCTION_INFO_V1(worker_partial_agg_ffunc);
|
||||||
|
PG_FUNCTION_INFO_V1(coord_combine_agg_sfunc);
|
||||||
|
PG_FUNCTION_INFO_V1(coord_combine_agg_ffunc);
|
||||||
|
|
||||||
|
typedef struct StypeBox {
|
||||||
|
Datum value;
|
||||||
|
bool value_null;
|
||||||
|
Oid agg;
|
||||||
|
} StypeBox;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (box) -> bytea
|
||||||
|
* return bytes(box.agg.name, box.agg.serial(box.value))
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
stypebox_serialize(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (bytea, internal) -> box
|
||||||
|
* box->agg = readagg(bytea)
|
||||||
|
* box->value = agg.deserial(readrest(bytea))
|
||||||
|
* return box
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
stypebox_deserialize(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (box1, box2) -> box
|
||||||
|
* box1.value = box.agg.combine(box1.value, box2.value)
|
||||||
|
* return box
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
stypebox_combine(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
StypeBox *box1 = NULL;
|
||||||
|
StypeBox *box2 = NULL;
|
||||||
|
Oid aggOid;
|
||||||
|
if (!PG_ISARGNULL(0))
|
||||||
|
{
|
||||||
|
box1 = PG_GETARG_POINTER(0);
|
||||||
|
}
|
||||||
|
if (!PG_ISARGNULL(1))
|
||||||
|
{
|
||||||
|
box2 = PG_GETARG_POINTER(1);
|
||||||
|
}
|
||||||
|
if (box1 == NULL)
|
||||||
|
{
|
||||||
|
if (box2 == NULL)
|
||||||
|
{
|
||||||
|
PG_RETURN_NULL();
|
||||||
|
}
|
||||||
|
box1 = palloc(sizeof(StypeBox));
|
||||||
|
box1->value = (Datum) 0;
|
||||||
|
box1->value_null = true;
|
||||||
|
box1->agg = box2->agg;
|
||||||
|
}
|
||||||
|
// TODO
|
||||||
|
// box1.agg = box1.agg.combine(box1.value, box2.value)
|
||||||
|
PG_RETURN_POINTER(box1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (box, agg, ...) -> box
|
||||||
|
* box.agg = agg;
|
||||||
|
* box.value = agg.sfunc(box.value, ...);
|
||||||
|
* return box
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
StypeBox *box;
|
||||||
|
int i;
|
||||||
|
if (PG_ARGISNULL(0)) {
|
||||||
|
box = palloc(sizeof(StypeBox));
|
||||||
|
box->agg = PG_GETARG_OID(1);
|
||||||
|
box->value = (Datum) 0;
|
||||||
|
box->value_null = true;
|
||||||
|
} else {
|
||||||
|
box = PG_GETARG_POINTER(0);
|
||||||
|
Assert(box->agg == PG_GETARG_OID(1));
|
||||||
|
}
|
||||||
|
FmgrInfo info;
|
||||||
|
fmgr_info(box->agg, &info);
|
||||||
|
FunctionCallInfo inner_fcinfo;
|
||||||
|
InitFunctionCallInfoData(&inner_fcinfo, &info, fcinfo->nargs - 1, fcinfo->collation, fcinfo->context, fcinfo->resultinfo);
|
||||||
|
// TODO if strict, deal with it
|
||||||
|
// Deal with memory management juggling (see executor/nodeAgg)
|
||||||
|
inner_fcinfo.arg[0] = box->value;
|
||||||
|
inner_fcinfo.argnull[0] = box->value_null;
|
||||||
|
memcpy(&inner_fcinfo.arg[1], &fcinfo.arg[2], sizeof(Datum) * (inner_fcinfo.nargs - 1));
|
||||||
|
memcpy(&inner_fcinfo.argnull[1], &fcinfo.argnull[2], sizeof(bool) * (inner_fcinfo.nargs - 1));
|
||||||
|
box->value = FunctionCallInvoke(inner_fcinfo);
|
||||||
|
box->value_null = inner_fcinfo.isnull;
|
||||||
|
PG_RETURN_POINTER(box);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (box) -> box.agg.stype
|
||||||
|
* return box.agg.serialize(box.value)
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
worker_partial_agg_ffunc(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (box, agg, valbytes) -> box
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
coord_combine_agg_sfunc(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
}
|
Loading…
Reference in New Issue