From 40a5e3128b492759828d74edd2d64d347ceeed84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 13 Jun 2019 11:24:40 -0700 Subject: [PATCH] Initial custom aggregates. Very WIP. Only a branch because I like backups. Will squash someday --- .../distributed/citus--8.4-1--8.4-2.sql | 62 +++++++++ .../planner/multi_logical_optimizer.c | 22 +++- .../distributed/utils/aggregate_utils.c | 124 ++++++++++++++++++ 3 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 src/backend/distributed/citus--8.4-1--8.4-2.sql create mode 100644 src/backend/distributed/utils/aggregate_utils.c diff --git a/src/backend/distributed/citus--8.4-1--8.4-2.sql b/src/backend/distributed/citus--8.4-1--8.4-2.sql new file mode 100644 index 000000000..9449077cf --- /dev/null +++ b/src/backend/distributed/citus--8.4-1--8.4-2.sql @@ -0,0 +1,62 @@ +CREATE FUNCTION stype_serialize(internal, oid, ...)serial +RETURNS internal +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE; + +CREATE FUNCTION stype_deserialize(internal, oid, ...)serial +RETURNS internal +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE; + +CREATE FUNCTION stype_combine(internal, oid, ...)serial +RETURNS internal +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE; + +CREATE FUNCTION worker_partial_agg_sfunc(internal, oid, ...) +RETURNS internal +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE; + +CREATE FUNCTION worker_partial_agg_ffunc(internal, oid, ...) +RETURNS internal +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE; + +CREATE FUNCTION coord_combine_agg_sfunc(internal, oid, ...) +RETURNS internal +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE; + +CREATE FUNCTION coord_combine_agg_ffunc(internal, oid, ...) +RETURNS internal +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE; + +-- select worker_partial_agg(agg, ...) +-- equivalent to +-- select serialize_stype(agg_without_ffunc(...)) +CREATE AGGREGATE worker_partial_agg(oid, ...) ( + STYPE = internal, + SFUNC = worker_partial_agg_sfunc, + FINALFUNC = worker_partial_agg_ffunc, + FINALFUNC_EXTRA, + COMBINEFUNC = stypebox_combine, + SERIALFUNC = stypebox_serialize, + DESERIALFUNC = stypebox_deserialize, + PARALLEL = SAFE +) + +-- select coord_combine_agg(agg, col) +-- equivalent to +-- select agg_ffunc(agg_combine(col)) +CREATE AGGREGATE coord_combine_agg(oid, ...) ( + STYPE = internal, + SFUNC = coord_combine_sfunc, + FINALFUNC = coord_combine_agg_ffunc, + FINALFUNC_EXTRA, + COMBINEFUNC = stypebox_combine, + SERIALFUNC = stypebox_serialize, + DESERIALFUNC = stypebox_deserialize, + PARALLEL = SAFE +) \ No newline at end of file diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 948c10fae..f7da0ff05 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -1508,8 +1508,28 @@ MasterAggregateExpression(Aggref *originalAggregate, const Index columnLevelsUp = 0; /* normal column */ const AttrNumber argumentId = 1; /* our aggregates have single arguments */ AggClauseCosts aggregateCosts; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + Oid combinefn; + Oid serialfn = InvalidOid; + Oid deserialfn = InvalidOid; - if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct && + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(originalAggregate->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + originalAggregate->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + /* planner recorded transition state type in the Aggref itself */ + combinefn = aggform->aggcombinefn; + + if (combinefn != InvalidOid) { + if (originalAggregate->aggtranstype == INTERNALOID) { + serialfn = aggform->aggserialfn; + deserialfn = aggform->aggdeserialfn; + } + } else if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct && CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION && walkerContext->pullDistinctColumns) { diff --git a/src/backend/distributed/utils/aggregate_utils.c b/src/backend/distributed/utils/aggregate_utils.c new file mode 100644 index 000000000..54efdce30 --- /dev/null +++ b/src/backend/distributed/utils/aggregate_utils.c @@ -0,0 +1,124 @@ +#include "postgres.h" + +#include "utils/fmgr.h" + +PG_FUNCTION_INFO_V1(stypebox_serialize); +PG_FUNCTION_INFO_V1(stypebox_deserialize); +PG_FUNCTION_INFO_V1(stypebox_combine); +PG_FUNCTION_INFO_V1(worker_partial_agg_sfunc); +PG_FUNCTION_INFO_V1(worker_partial_agg_ffunc); +PG_FUNCTION_INFO_V1(coord_combine_agg_sfunc); +PG_FUNCTION_INFO_V1(coord_combine_agg_ffunc); + +typedef struct StypeBox { + Datum value; + bool value_null; + Oid agg; +} StypeBox; + +/* + * (box) -> bytea + * return bytes(box.agg.name, box.agg.serial(box.value)) + */ +Datum +stypebox_serialize(PG_FUNCTION_ARGS) +{ +} + +/* + * (bytea, internal) -> box + * box->agg = readagg(bytea) + * box->value = agg.deserial(readrest(bytea)) + * return box + */ +Datum +stypebox_deserialize(PG_FUNCTION_ARGS) +{ +} + +/* + * (box1, box2) -> box + * box1.value = box.agg.combine(box1.value, box2.value) + * return box + */ +Datum +stypebox_combine(PG_FUNCTION_ARGS) +{ + StypeBox *box1 = NULL; + StypeBox *box2 = NULL; + Oid aggOid; + if (!PG_ISARGNULL(0)) + { + box1 = PG_GETARG_POINTER(0); + } + if (!PG_ISARGNULL(1)) + { + box2 = PG_GETARG_POINTER(1); + } + if (box1 == NULL) + { + if (box2 == NULL) + { + PG_RETURN_NULL(); + } + box1 = palloc(sizeof(StypeBox)); + box1->value = (Datum) 0; + box1->value_null = true; + box1->agg = box2->agg; + } + // TODO + // box1.agg = box1.agg.combine(box1.value, box2.value) + PG_RETURN_POINTER(box1); +} + +/* + * (box, agg, ...) -> box + * box.agg = agg; + * box.value = agg.sfunc(box.value, ...); + * return box + */ +Datum +worker_partial_agg_sfunc(PG_FUNCTION_ARGS) +{ + StypeBox *box; + int i; + if (PG_ARGISNULL(0)) { + box = palloc(sizeof(StypeBox)); + box->agg = PG_GETARG_OID(1); + box->value = (Datum) 0; + box->value_null = true; + } else { + box = PG_GETARG_POINTER(0); + Assert(box->agg == PG_GETARG_OID(1)); + } + FmgrInfo info; + fmgr_info(box->agg, &info); + FunctionCallInfo inner_fcinfo; + InitFunctionCallInfoData(&inner_fcinfo, &info, fcinfo->nargs - 1, fcinfo->collation, fcinfo->context, fcinfo->resultinfo); + // TODO if strict, deal with it + // Deal with memory management juggling (see executor/nodeAgg) + inner_fcinfo.arg[0] = box->value; + inner_fcinfo.argnull[0] = box->value_null; + memcpy(&inner_fcinfo.arg[1], &fcinfo.arg[2], sizeof(Datum) * (inner_fcinfo.nargs - 1)); + memcpy(&inner_fcinfo.argnull[1], &fcinfo.argnull[2], sizeof(bool) * (inner_fcinfo.nargs - 1)); + box->value = FunctionCallInvoke(inner_fcinfo); + box->value_null = inner_fcinfo.isnull; + PG_RETURN_POINTER(box); +} + +/* + * (box) -> box.agg.stype + * return box.agg.serialize(box.value) + */ +Datum +worker_partial_agg_ffunc(PG_FUNCTION_ARGS) +{ +} + +/* + * (box, agg, valbytes) -> box + */ +Datum +coord_combine_agg_sfunc(PG_FUNCTION_ARGS) +{ +}