diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 9aef9deaf..11fa640e5 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '9.0-1' +default_version = '9.0-customagg' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 658205487..e763f6944 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -1517,7 +1517,7 @@ MasterAggregateExpression(Aggref *originalAggregate, ObjectIdGetDatum(originalAggregate->aggfnoid)); if (!HeapTupleIsValid(aggTuple)) { - elog(WARNING, "cache lookup failed for aggregate %u", + elog(WARNING, "citus cache lookup failed for aggregate %u", originalAggregate->aggfnoid); combine = InvalidOid; } @@ -2808,7 +2808,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate, ObjectIdGetDatum(originalAggregate->aggfnoid)); if (!HeapTupleIsValid(aggTuple)) { - elog(WARNING, "cache lookup failed for aggregate %u", + elog(WARNING, "citus cache lookup failed for aggregate %u", originalAggregate->aggfnoid); combine = InvalidOid; } @@ -3006,7 +3006,8 @@ GetAggregateType(Oid aggFunctionId) aggregateProcName = get_func_name(aggFunctionId); if (aggregateProcName == NULL) { - ereport(ERROR, (errmsg("cache lookup failed for function %u", aggFunctionId))); + ereport(ERROR, (errmsg("citus cache lookup failed for function %u", + aggFunctionId))); } aggregateCount = lengthof(AggregateNames); diff --git a/src/backend/distributed/citus--8.4-1--8.4-customagg.sql b/src/backend/distributed/sql/citus--9.0-1--9.0-customagg.sql similarity index 100% rename from src/backend/distributed/citus--8.4-1--8.4-customagg.sql rename to src/backend/distributed/sql/citus--9.0-1--9.0-customagg.sql diff --git a/src/backend/distributed/utils/aggregate_utils.c b/src/backend/distributed/utils/aggregate_utils.c index fb7e06371..6291ec823 100644 --- a/src/backend/distributed/utils/aggregate_utils.c +++ b/src/backend/distributed/utils/aggregate_utils.c @@ -32,6 +32,8 @@ typedef struct StypeBox static HeapTuple get_aggform(Oid oid, Form_pg_aggregate *form); static HeapTuple get_procform(Oid oid, Form_pg_proc *form); static HeapTuple get_typeform(Oid oid, Form_pg_type *form); +static void * pallocInAggContext(FunctionCallInfo fcinfo, size_t size); +static void InitializeStypeBox(StypeBox *box, HeapTuple aggTuple, Oid transtype); static HeapTuple get_aggform(Oid oid, Form_pg_aggregate *form) @@ -39,7 +41,7 @@ get_aggform(Oid oid, Form_pg_aggregate *form) HeapTuple tuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(oid)); if (!HeapTupleIsValid(tuple)) { - elog(ERROR, "cache lookup failed for aggregate %u", oid); + elog(ERROR, "citus cache lookup failed for aggregate %u", oid); } *form = (Form_pg_aggregate) GETSTRUCT(tuple); return tuple; @@ -52,7 +54,7 @@ get_procform(Oid oid, Form_pg_proc *form) HeapTuple tuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(oid)); if (!HeapTupleIsValid(tuple)) { - elog(ERROR, "cache lookup failed for function %u", oid); + elog(ERROR, "citus cache lookup failed for function %u", oid); } *form = (Form_pg_proc) GETSTRUCT(tuple); return tuple; @@ -65,13 +67,31 @@ get_typeform(Oid oid, Form_pg_type *form) HeapTuple tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(oid)); if (!HeapTupleIsValid(tuple)) { - elog(ERROR, "cache lookup failed for type %u", oid); + elog(ERROR, "citus cache lookup failed for type %u", oid); } *form = (Form_pg_type) GETSTRUCT(tuple); return tuple; } +static void * +pallocInAggContext(FunctionCallInfo fcinfo, size_t size) +{ + MemoryContext aggregateContext; + MemoryContext oldContext; + void *result; + if (!AggCheckCallContext(fcinfo, &aggregateContext)) + { + elog(ERROR, "Aggregate function called without an aggregate context"); + } + + oldContext = MemoryContextSwitchTo(aggregateContext); + result = palloc(size); + MemoryContextSwitchTo(oldContext); + return result; +} + + /* * See GetAggInitVal from pg's nodeAgg.c */ @@ -124,6 +144,7 @@ citus_stype_serialize(PG_FUNCTION_ARGS) Datum result; elog(WARNING, "citus_stype_serialize"); + elog(WARNING, "\t%d", box->agg); aggtuple = get_aggform(box->agg, &aggform); serial = aggform->aggserialfn; @@ -132,6 +153,8 @@ citus_stype_serialize(PG_FUNCTION_ARGS) if (serial == InvalidOid) { + elog(WARNING, "\tnoserial, load %d", transtype); + /* TODO do we have to fallback to output/receive if not set? */ /* ie is it possible for send/recv to be unset? */ transtypetuple = get_typeform(transtype, &transtypeform); @@ -207,12 +230,12 @@ citus_stype_deserialize(PG_FUNCTION_ARGS) StringInfoData buf; bool value_null; - elog(WARNING, "citus_stype_deserialize"); - memcpy(&agg, VARDATA(bytes), sizeof(Oid)); memcpy(&value_null, VARDATA(bytes) + sizeof(Oid), sizeof(bool)); - box = palloc(sizeof(StypeBox)); + elog(WARNING, "citus_stype_deserialize %d %d", agg, value_null); + + box = pallocInAggContext(fcinfo, sizeof(StypeBox)); box->agg = agg; if (value_null) { @@ -302,17 +325,18 @@ citus_stype_combine(PG_FUNCTION_ARGS) { PG_RETURN_NULL(); } - box1 = palloc(sizeof(StypeBox)); + + box1 = pallocInAggContext(fcinfo, sizeof(StypeBox)); box1->value = (Datum) 0; box1->value_null = true; box1->agg = box2->agg; } aggtuple = get_aggform(box1->agg, &aggform); - Assert(aggform->combineefn != InvalidOid); combine = aggform->aggcombinefn; ReleaseSysCache(aggtuple); + Assert(combine != InvalidOid); fmgr_info(combine, &info); if (info.fn_strict) @@ -366,7 +390,7 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS) if (is_initial_call) { - box = palloc(sizeof(StypeBox)); + box = pallocInAggContext(fcinfo, sizeof(StypeBox)); box->agg = PG_GETARG_OID(1); } else @@ -375,6 +399,8 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS) Assert(box->agg == PG_GETARG_OID(1)); } + elog(WARNING, "\tbox: %p", box); + elog(WARNING, "\tagg: %d", box->agg); aggtuple = get_aggform(box->agg, &aggform); aggsfunc = aggform->aggtransfn; if (is_initial_call) @@ -412,9 +438,11 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS) fcSetArgExt(inner_fcinfo, i, fcGetArgValue(fcinfo, i + 1), fcGetArgNull(fcinfo, i + 1)); } + elog(WARNING, "invoke sfunc"); box->value = FunctionCallInvoke(inner_fcinfo); box->value_null = inner_fcinfo->isnull; + elog(WARNING, "\tworker sfunc agg: %d", box->agg); elog(WARNING, "\tworker sfunc null: %d", box->value_null); PG_RETURN_POINTER(box); @@ -446,6 +474,8 @@ worker_partial_agg_ffunc(PG_FUNCTION_ARGS) PG_RETURN_NULL(); } + elog(WARNING, "\tagg: %d", box->agg); + aggtuple = get_aggform(box->agg, &aggform); serial = aggform->aggserialfn; transtype = aggform->aggtranstype; @@ -453,6 +483,8 @@ worker_partial_agg_ffunc(PG_FUNCTION_ARGS) if (serial == InvalidOid) { + elog(WARNING, "\tload typeform %d", transtype); + /* TODO do we have to fallback to output/receive if not set? */ /* ie is it possible for send/recv to be unset? */ transtypetuple = get_typeform(transtype, &transtypeform); @@ -462,17 +494,21 @@ worker_partial_agg_ffunc(PG_FUNCTION_ARGS) Assert(serial != InvalidOid); - elog(WARNING, "calling serial %d", serial); + elog(WARNING, "\tcalling serial %d", serial); fmgr_info(serial, &info); if (info.fn_strict && box->value_null) { + elog(WARNING, "\t\t& strict NULL"); PG_RETURN_NULL(); } + elog(WARNING, "\t\tinit inner_fcinfo %ld %d", box->value, box->value_null); InitFunctionCallInfoData(*inner_fcinfo, &info, 1, fcinfo->fncollation, fcinfo->context, fcinfo->resultinfo); fcSetArgExt(inner_fcinfo, 0, box->value, box->value_null); + + elog(WARNING, "\t\tinvoke inner_fcinfo %p %p", info.fn_addr, array_send); result = FunctionCallInvoke(inner_fcinfo); - elog(WARNING, "& done %d", VARSIZE(DatumGetByteaPP(result))); + elog(WARNING, "\t\t& done %d", VARSIZE(DatumGetByteaPP(result))); if (inner_fcinfo->isnull) { PG_RETURN_NULL(); @@ -508,7 +544,8 @@ coord_combine_agg_sfunc(PG_FUNCTION_ARGS) if (PG_ARGISNULL(0)) { - box = palloc(sizeof(StypeBox)); + box = pallocInAggContext(fcinfo, sizeof(StypeBox)); + box->agg = PG_GETARG_OID(1); box->value = (Datum) 0; box->value_null = true;