Gets as far as calling array_send, which seems to be getting an invalid array, maybe a memory context issue

fix_120_custom_aggregates_distribute_multiarg
Philip Dubé 2019-08-30 19:11:46 +00:00
parent 22c6dc7f84
commit 5b25204dda
4 changed files with 54 additions and 16 deletions

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '9.0-1'
default_version = '9.0-customagg'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -1517,7 +1517,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
ObjectIdGetDatum(originalAggregate->aggfnoid));
if (!HeapTupleIsValid(aggTuple))
{
elog(WARNING, "cache lookup failed for aggregate %u",
elog(WARNING, "citus cache lookup failed for aggregate %u",
originalAggregate->aggfnoid);
combine = InvalidOid;
}
@ -2808,7 +2808,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
ObjectIdGetDatum(originalAggregate->aggfnoid));
if (!HeapTupleIsValid(aggTuple))
{
elog(WARNING, "cache lookup failed for aggregate %u",
elog(WARNING, "citus cache lookup failed for aggregate %u",
originalAggregate->aggfnoid);
combine = InvalidOid;
}
@ -3006,7 +3006,8 @@ GetAggregateType(Oid aggFunctionId)
aggregateProcName = get_func_name(aggFunctionId);
if (aggregateProcName == NULL)
{
ereport(ERROR, (errmsg("cache lookup failed for function %u", aggFunctionId)));
ereport(ERROR, (errmsg("citus cache lookup failed for function %u",
aggFunctionId)));
}
aggregateCount = lengthof(AggregateNames);

View File

@ -32,6 +32,8 @@ typedef struct StypeBox
static HeapTuple get_aggform(Oid oid, Form_pg_aggregate *form);
static HeapTuple get_procform(Oid oid, Form_pg_proc *form);
static HeapTuple get_typeform(Oid oid, Form_pg_type *form);
static void * pallocInAggContext(FunctionCallInfo fcinfo, size_t size);
static void InitializeStypeBox(StypeBox *box, HeapTuple aggTuple, Oid transtype);
static HeapTuple
get_aggform(Oid oid, Form_pg_aggregate *form)
@ -39,7 +41,7 @@ get_aggform(Oid oid, Form_pg_aggregate *form)
HeapTuple tuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(oid));
if (!HeapTupleIsValid(tuple))
{
elog(ERROR, "cache lookup failed for aggregate %u", oid);
elog(ERROR, "citus cache lookup failed for aggregate %u", oid);
}
*form = (Form_pg_aggregate) GETSTRUCT(tuple);
return tuple;
@ -52,7 +54,7 @@ get_procform(Oid oid, Form_pg_proc *form)
HeapTuple tuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(oid));
if (!HeapTupleIsValid(tuple))
{
elog(ERROR, "cache lookup failed for function %u", oid);
elog(ERROR, "citus cache lookup failed for function %u", oid);
}
*form = (Form_pg_proc) GETSTRUCT(tuple);
return tuple;
@ -65,13 +67,31 @@ get_typeform(Oid oid, Form_pg_type *form)
HeapTuple tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(oid));
if (!HeapTupleIsValid(tuple))
{
elog(ERROR, "cache lookup failed for type %u", oid);
elog(ERROR, "citus cache lookup failed for type %u", oid);
}
*form = (Form_pg_type) GETSTRUCT(tuple);
return tuple;
}
static void *
pallocInAggContext(FunctionCallInfo fcinfo, size_t size)
{
MemoryContext aggregateContext;
MemoryContext oldContext;
void *result;
if (!AggCheckCallContext(fcinfo, &aggregateContext))
{
elog(ERROR, "Aggregate function called without an aggregate context");
}
oldContext = MemoryContextSwitchTo(aggregateContext);
result = palloc(size);
MemoryContextSwitchTo(oldContext);
return result;
}
/*
* See GetAggInitVal from pg's nodeAgg.c
*/
@ -124,6 +144,7 @@ citus_stype_serialize(PG_FUNCTION_ARGS)
Datum result;
elog(WARNING, "citus_stype_serialize");
elog(WARNING, "\t%d", box->agg);
aggtuple = get_aggform(box->agg, &aggform);
serial = aggform->aggserialfn;
@ -132,6 +153,8 @@ citus_stype_serialize(PG_FUNCTION_ARGS)
if (serial == InvalidOid)
{
elog(WARNING, "\tnoserial, load %d", transtype);
/* TODO do we have to fallback to output/receive if not set? */
/* ie is it possible for send/recv to be unset? */
transtypetuple = get_typeform(transtype, &transtypeform);
@ -207,12 +230,12 @@ citus_stype_deserialize(PG_FUNCTION_ARGS)
StringInfoData buf;
bool value_null;
elog(WARNING, "citus_stype_deserialize");
memcpy(&agg, VARDATA(bytes), sizeof(Oid));
memcpy(&value_null, VARDATA(bytes) + sizeof(Oid), sizeof(bool));
box = palloc(sizeof(StypeBox));
elog(WARNING, "citus_stype_deserialize %d %d", agg, value_null);
box = pallocInAggContext(fcinfo, sizeof(StypeBox));
box->agg = agg;
if (value_null)
{
@ -302,17 +325,18 @@ citus_stype_combine(PG_FUNCTION_ARGS)
{
PG_RETURN_NULL();
}
box1 = palloc(sizeof(StypeBox));
box1 = pallocInAggContext(fcinfo, sizeof(StypeBox));
box1->value = (Datum) 0;
box1->value_null = true;
box1->agg = box2->agg;
}
aggtuple = get_aggform(box1->agg, &aggform);
Assert(aggform->combineefn != InvalidOid);
combine = aggform->aggcombinefn;
ReleaseSysCache(aggtuple);
Assert(combine != InvalidOid);
fmgr_info(combine, &info);
if (info.fn_strict)
@ -366,7 +390,7 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
if (is_initial_call)
{
box = palloc(sizeof(StypeBox));
box = pallocInAggContext(fcinfo, sizeof(StypeBox));
box->agg = PG_GETARG_OID(1);
}
else
@ -375,6 +399,8 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
Assert(box->agg == PG_GETARG_OID(1));
}
elog(WARNING, "\tbox: %p", box);
elog(WARNING, "\tagg: %d", box->agg);
aggtuple = get_aggform(box->agg, &aggform);
aggsfunc = aggform->aggtransfn;
if (is_initial_call)
@ -412,9 +438,11 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
fcSetArgExt(inner_fcinfo, i, fcGetArgValue(fcinfo, i + 1), fcGetArgNull(fcinfo,
i + 1));
}
elog(WARNING, "invoke sfunc");
box->value = FunctionCallInvoke(inner_fcinfo);
box->value_null = inner_fcinfo->isnull;
elog(WARNING, "\tworker sfunc agg: %d", box->agg);
elog(WARNING, "\tworker sfunc null: %d", box->value_null);
PG_RETURN_POINTER(box);
@ -446,6 +474,8 @@ worker_partial_agg_ffunc(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
}
elog(WARNING, "\tagg: %d", box->agg);
aggtuple = get_aggform(box->agg, &aggform);
serial = aggform->aggserialfn;
transtype = aggform->aggtranstype;
@ -453,6 +483,8 @@ worker_partial_agg_ffunc(PG_FUNCTION_ARGS)
if (serial == InvalidOid)
{
elog(WARNING, "\tload typeform %d", transtype);
/* TODO do we have to fallback to output/receive if not set? */
/* ie is it possible for send/recv to be unset? */
transtypetuple = get_typeform(transtype, &transtypeform);
@ -462,17 +494,21 @@ worker_partial_agg_ffunc(PG_FUNCTION_ARGS)
Assert(serial != InvalidOid);
elog(WARNING, "calling serial %d", serial);
elog(WARNING, "\tcalling serial %d", serial);
fmgr_info(serial, &info);
if (info.fn_strict && box->value_null)
{
elog(WARNING, "\t\t& strict NULL");
PG_RETURN_NULL();
}
elog(WARNING, "\t\tinit inner_fcinfo %ld %d", box->value, box->value_null);
InitFunctionCallInfoData(*inner_fcinfo, &info, 1, fcinfo->fncollation,
fcinfo->context, fcinfo->resultinfo);
fcSetArgExt(inner_fcinfo, 0, box->value, box->value_null);
elog(WARNING, "\t\tinvoke inner_fcinfo %p %p", info.fn_addr, array_send);
result = FunctionCallInvoke(inner_fcinfo);
elog(WARNING, "& done %d", VARSIZE(DatumGetByteaPP(result)));
elog(WARNING, "\t\t& done %d", VARSIZE(DatumGetByteaPP(result)));
if (inner_fcinfo->isnull)
{
PG_RETURN_NULL();
@ -508,7 +544,8 @@ coord_combine_agg_sfunc(PG_FUNCTION_ARGS)
if (PG_ARGISNULL(0))
{
box = palloc(sizeof(StypeBox));
box = pallocInAggContext(fcinfo, sizeof(StypeBox));
box->agg = PG_GETARG_OID(1);
box->value = (Datum) 0;
box->value_null = true;