mirror of https://github.com/citusdata/citus.git
Clean up code to point that it compiles
parent
186f632f88
commit
baef1d2f85
|
@ -1517,19 +1517,22 @@ MasterAggregateExpression(Aggref *originalAggregate,
|
||||||
aggTuple = SearchSysCache1(AGGFNOID,
|
aggTuple = SearchSysCache1(AGGFNOID,
|
||||||
ObjectIdGetDatum(originalAggregate->aggfnoid));
|
ObjectIdGetDatum(originalAggregate->aggfnoid));
|
||||||
if (!HeapTupleIsValid(aggTuple))
|
if (!HeapTupleIsValid(aggTuple))
|
||||||
|
{
|
||||||
elog(ERROR, "cache lookup failed for aggregate %u",
|
elog(ERROR, "cache lookup failed for aggregate %u",
|
||||||
originalAggregate->aggfnoid);
|
originalAggregate->aggfnoid);
|
||||||
|
}
|
||||||
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
|
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
|
||||||
combine = aggform->aggcombinefn;
|
combine = aggform->aggcombinefn;
|
||||||
if (IsValidOid(combine) && originalAggregate->aggtranstype == INTERNALOID) {
|
if (combine != InvalidOid && originalAggregate->aggtranstype == INTERNALOID)
|
||||||
|
{
|
||||||
serial = aggform->aggserialfn;
|
serial = aggform->aggserialfn;
|
||||||
deserial = aggform->aggdeserialfn;
|
deserial = aggform->aggdeserialfn;
|
||||||
}
|
}
|
||||||
ReleaseSysCache(aggTuple);
|
ReleaseSysCache(aggTuple);
|
||||||
|
|
||||||
if (IsValidOid(combine)) {
|
if (combine != InvalidOid)
|
||||||
|
{ }
|
||||||
} else if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
|
else if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
|
||||||
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
|
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
|
||||||
walkerContext->pullDistinctColumns)
|
walkerContext->pullDistinctColumns)
|
||||||
{
|
{
|
||||||
|
|
|
@ -4,9 +4,9 @@
|
||||||
#include "catalog/pg_aggregate.h"
|
#include "catalog/pg_aggregate.h"
|
||||||
#include "catalog/pg_proc.h"
|
#include "catalog/pg_proc.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "utils/cache/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/fmgr.h"
|
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
#include "fmgr.h"
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(stypebox_serialize);
|
PG_FUNCTION_INFO_V1(stypebox_serialize);
|
||||||
PG_FUNCTION_INFO_V1(stypebox_deserialize);
|
PG_FUNCTION_INFO_V1(stypebox_deserialize);
|
||||||
|
@ -16,49 +16,59 @@ PG_FUNCTION_INFO_V1(worker_partial_agg_ffunc);
|
||||||
PG_FUNCTION_INFO_V1(coord_combine_agg_sfunc);
|
PG_FUNCTION_INFO_V1(coord_combine_agg_sfunc);
|
||||||
PG_FUNCTION_INFO_V1(coord_combine_agg_ffunc);
|
PG_FUNCTION_INFO_V1(coord_combine_agg_ffunc);
|
||||||
|
|
||||||
// TODO nodeAgg seems to decide to use serial/deserial based on stype == internal
|
/* TODO nodeAgg seems to decide to use serial/deserial based on stype == internal */
|
||||||
// Preferably we should match that logic, instead of checking serial/deserial oids
|
/* Preferably we should match that logic, instead of checking serial/deserial oids */
|
||||||
|
|
||||||
typedef struct StypeBox {
|
typedef struct StypeBox
|
||||||
|
{
|
||||||
Datum value;
|
Datum value;
|
||||||
Oid agg;
|
Oid agg;
|
||||||
bool value_null;
|
bool value_null;
|
||||||
} StypeBox;
|
} StypeBox;
|
||||||
|
|
||||||
static HeapTuple *get_aggform(Oid oid, Form_pg_aggregate *form);
|
static HeapTuple get_aggform(Oid oid, Form_pg_aggregate *form);
|
||||||
static HeapTuple *get_procform(Oid oid, Form_pg_proc *form);
|
static HeapTuple get_procform(Oid oid, Form_pg_proc *form);
|
||||||
static HeapTuple *get_typeform(Oid oid, Form_pg_type *form);
|
static HeapTuple get_typeform(Oid oid, Form_pg_type *form);
|
||||||
|
|
||||||
static HeapTuple *
|
static HeapTuple
|
||||||
get_aggform(Oid oid, Form_pg_aggregate *form)
|
get_aggform(Oid oid, Form_pg_aggregate *form)
|
||||||
{
|
{
|
||||||
HeapTuple tuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(oid));
|
HeapTuple tuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(oid));
|
||||||
if (!HeapTupleIsValid(tuple))
|
if (!HeapTupleIsValid(tuple))
|
||||||
|
{
|
||||||
elog(ERROR, "cache lookup failed for aggregate %u", oid);
|
elog(ERROR, "cache lookup failed for aggregate %u", oid);
|
||||||
|
}
|
||||||
*form = (Form_pg_aggregate) GETSTRUCT(tuple);
|
*form = (Form_pg_aggregate) GETSTRUCT(tuple);
|
||||||
return tuple;
|
return tuple;
|
||||||
}
|
}
|
||||||
|
|
||||||
static HeapTuple *
|
|
||||||
get_procform(Oid oid)
|
static HeapTuple
|
||||||
|
get_procform(Oid oid, Form_pg_proc *form)
|
||||||
{
|
{
|
||||||
HeapTuple tuple = SearchSysCache1(PROCID, ObjectIdGetDatum(fnoid));
|
HeapTuple tuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(oid));
|
||||||
if (!HeapTupleIsValid(tuple))
|
if (!HeapTupleIsValid(tuple))
|
||||||
|
{
|
||||||
elog(ERROR, "cache lookup failed for function %u", oid);
|
elog(ERROR, "cache lookup failed for function %u", oid);
|
||||||
|
}
|
||||||
*form = (Form_pg_proc) GETSTRUCT(tuple);
|
*form = (Form_pg_proc) GETSTRUCT(tuple);
|
||||||
return tuple;
|
return tuple;
|
||||||
}
|
}
|
||||||
|
|
||||||
static HeapTuple *
|
|
||||||
get_typeform(Oid oid)
|
static HeapTuple
|
||||||
|
get_typeform(Oid oid, Form_pg_type *form)
|
||||||
{
|
{
|
||||||
HeapTuple tuple = SearchSysCache1(TYPEID, ObjectIdGetDatum(oid));
|
HeapTuple tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(oid));
|
||||||
if (!HeapTupleIsValid(tuple))
|
if (!HeapTupleIsValid(tuple))
|
||||||
|
{
|
||||||
elog(ERROR, "cache lookup failed for type %u", oid);
|
elog(ERROR, "cache lookup failed for type %u", oid);
|
||||||
|
}
|
||||||
*form = (Form_pg_type) GETSTRUCT(tuple);
|
*form = (Form_pg_type) GETSTRUCT(tuple);
|
||||||
return tuple;
|
return tuple;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (box) -> bytea
|
* (box) -> bytea
|
||||||
* return bytes(box.agg.oid, box.agg.serial(box.value))
|
* return bytes(box.agg.oid, box.agg.serial(box.value))
|
||||||
|
@ -66,15 +76,17 @@ get_typeform(Oid oid)
|
||||||
Datum
|
Datum
|
||||||
stypebox_serialize(PG_FUNCTION_ARGS)
|
stypebox_serialize(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
FunctionCallInfo inner_fcinfo;
|
FunctionCallInfoData inner_fcinfodata;
|
||||||
FmgrInfo info;
|
FunctionCallInfo inner_fcinfo = &inner_fcinfodata;
|
||||||
StypeBox *box = PG_GETARG_POINTER(0);
|
FmgrInfo infodata;
|
||||||
HeapTuple *aggtuple;
|
FmgrInfo *info = &infodata;
|
||||||
HeapTuple *transtypetuple;
|
StypeBox *box = (StypeBox *) PG_GETARG_POINTER(0);
|
||||||
|
HeapTuple aggtuple;
|
||||||
|
HeapTuple transtypetuple;
|
||||||
Form_pg_aggregate aggform;
|
Form_pg_aggregate aggform;
|
||||||
Form_pg_type transtypeform;
|
Form_pg_type transtypeform;
|
||||||
byteap *valbytes;
|
bytea *valbytes;
|
||||||
byteap *realbytes;
|
bytea *realbytes;
|
||||||
Oid serial;
|
Oid serial;
|
||||||
Oid transtype;
|
Oid transtype;
|
||||||
Size valbyteslen_exhdr;
|
Size valbyteslen_exhdr;
|
||||||
|
@ -82,20 +94,20 @@ stypebox_serialize(PG_FUNCTION_ARGS)
|
||||||
Datum result;
|
Datum result;
|
||||||
|
|
||||||
aggtuple = get_aggform(box->agg, &aggform);
|
aggtuple = get_aggform(box->agg, &aggform);
|
||||||
serial = aggform->serialfunc;
|
serial = aggform->aggserialfn;
|
||||||
transtype = aggform->aggtranstype;
|
transtype = aggform->aggtranstype;
|
||||||
ReleaseSysCache(aggtuple);
|
ReleaseSysCache(aggtuple);
|
||||||
|
|
||||||
if (!IsValidOid(serial))
|
if (serial != InvalidOid)
|
||||||
{
|
{
|
||||||
// TODO do we have to fallback to output/receive if not set?
|
/* TODO do we have to fallback to output/receive if not set? */
|
||||||
// ie is it possible for send/recv to be unset?
|
/* ie is it possible for send/recv to be unset? */
|
||||||
transtypetuple = get_typeform(transtype, &transtypeform);
|
transtypetuple = get_typeform(transtype, &transtypeform);
|
||||||
serial = transtypeform->typsend;
|
serial = transtypeform->typsend;
|
||||||
ReleaseSysCache(transtypetuple);
|
ReleaseSysCache(transtypetuple);
|
||||||
}
|
}
|
||||||
|
|
||||||
fmgr_info(serial, &info);
|
fmgr_info(serial, info);
|
||||||
if (info->fn_strict && box->value_null)
|
if (info->fn_strict && box->value_null)
|
||||||
{
|
{
|
||||||
valbytes = NULL;
|
valbytes = NULL;
|
||||||
|
@ -103,11 +115,12 @@ stypebox_serialize(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
InitFunctionCallInfoData(&inner_fcinfo, &info, 1, fcinfo->collation, fcinfo->context, fcinfo->resultinfo);
|
InitFunctionCallInfoData(*inner_fcinfo, info, 1, fcinfo->fncollation,
|
||||||
inner_fcinfo.arg[0] = box->value;
|
fcinfo->context, fcinfo->resultinfo);
|
||||||
inner_fcinfo.argnull[0] = box->value_null;
|
inner_fcinfo->arg[0] = box->value;
|
||||||
result = FunctionCallInvoke(&inner_fcinfo);
|
inner_fcinfo->argnull[0] = box->value_null;
|
||||||
if (inner_fcinfo.isnull)
|
result = FunctionCallInvoke(inner_fcinfo);
|
||||||
|
if (inner_fcinfo->isnull)
|
||||||
{
|
{
|
||||||
valbytes = NULL;
|
valbytes = NULL;
|
||||||
valbyteslen_exhdr = 0;
|
valbyteslen_exhdr = 0;
|
||||||
|
@ -129,12 +142,13 @@ stypebox_serialize(PG_FUNCTION_ARGS)
|
||||||
memcpy(VARDATA(realbytes) + sizeof(Oid) + sizeof(bool),
|
memcpy(VARDATA(realbytes) + sizeof(Oid) + sizeof(bool),
|
||||||
VARDATA(valbytes),
|
VARDATA(valbytes),
|
||||||
valbyteslen_exhdr);
|
valbyteslen_exhdr);
|
||||||
pfree(valbytes); // TODO I get to free this right?
|
pfree(valbytes); /* TODO I get to free this right? */
|
||||||
}
|
}
|
||||||
|
|
||||||
PG_RETURN_BYTEA_P(valbytes);
|
PG_RETURN_BYTEA_P(valbytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (bytea, internal) -> box
|
* (bytea, internal) -> box
|
||||||
* box->agg = readagg(bytea)
|
* box->agg = readagg(bytea)
|
||||||
|
@ -146,11 +160,11 @@ Datum
|
||||||
stypebox_deserialize(PG_FUNCTION_ARGS)
|
stypebox_deserialize(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
StypeBox *box;
|
StypeBox *box;
|
||||||
byteap *bytes = PG_GETARG_BYTEA_PP(0);
|
bytea *bytes = PG_GETARG_BYTEA_PP(0);
|
||||||
byteap *inner_bytes;
|
bytea *inner_bytes;
|
||||||
Oid agg;
|
Oid agg;
|
||||||
HeapTuple *aggtuple;
|
HeapTuple aggtuple;
|
||||||
HeapTuple *transtypetuple;
|
HeapTuple transtypetuple;
|
||||||
Form_pg_aggregate aggform;
|
Form_pg_aggregate aggform;
|
||||||
Form_pg_type transtypeform;
|
Form_pg_type transtypeform;
|
||||||
Oid deserial;
|
Oid deserial;
|
||||||
|
@ -173,14 +187,27 @@ stypebox_deserialize(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
aggtuple = get_aggform(agg, &aggform);
|
aggtuple = get_aggform(agg, &aggform);
|
||||||
deserial = aggform->deserialfunc;
|
deserial = aggform->aggdeserialfn;
|
||||||
transtype = aggform->aggtranstype;
|
transtype = aggform->aggtranstype;
|
||||||
ReleaseSysCache(aggtuple);
|
ReleaseSysCache(aggtuple);
|
||||||
|
|
||||||
if (aggform->deserialfunc) {
|
if (deserial != InvalidOid)
|
||||||
inner_bytes = PG_GETARG_BYTEA_P_SLICE(0, sizeof(Oid), VARSIZE(bytes) - sizeof(Oid))
|
{
|
||||||
box->value = DirectFunctionCall2(deserial, inner_bytes, PG_GETARG_DATUM(1));
|
FmgrInfo fdeserialinfo;
|
||||||
box->null_value = false;
|
FunctionCallInfoData fdeserial_callinfodata;
|
||||||
|
|
||||||
|
inner_bytes = PG_GETARG_BYTEA_P_SLICE(0, sizeof(Oid), VARSIZE(bytes) -
|
||||||
|
sizeof(Oid));
|
||||||
|
fmgr_info(deserial, &fdeserialinfo);
|
||||||
|
InitFunctionCallInfoData(fdeserial_callinfodata, &fdeserialinfo, 2,
|
||||||
|
fcinfo->fncollation, fcinfo->context,
|
||||||
|
fcinfo->resultinfo);
|
||||||
|
fdeserial_callinfodata.arg[0] = PointerGetDatum(inner_bytes);
|
||||||
|
fdeserial_callinfodata.argnull[0] = false;
|
||||||
|
fdeserial_callinfodata.arg[1] = PG_GETARG_DATUM(1);
|
||||||
|
fdeserial_callinfodata.argnull[1] = false;
|
||||||
|
box->value = FunctionCallInvoke(&fdeserial_callinfodata);
|
||||||
|
box->value_null = fdeserial_callinfodata.isnull;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -190,9 +217,9 @@ stypebox_deserialize(PG_FUNCTION_ARGS)
|
||||||
ReleaseSysCache(transtypetuple);
|
ReleaseSysCache(transtypetuple);
|
||||||
|
|
||||||
initStringInfo(&buf);
|
initStringInfo(&buf);
|
||||||
appendBinaryStringInfo(buf,
|
appendBinaryStringInfo(&buf,
|
||||||
VARDATA(realbytes) + sizeof(Oid) + sizeof(bool),
|
VARDATA(bytes) + sizeof(Oid) + sizeof(bool),
|
||||||
VARSIZE(realbytes) - VARHDRSZ - sizeof(Oid) - sizeof(bool));
|
VARSIZE(bytes) - VARHDRSZ - sizeof(Oid) - sizeof(bool));
|
||||||
|
|
||||||
box->value = OidReceiveFunctionCall(recv, &buf, ioparam, -1);
|
box->value = OidReceiveFunctionCall(recv, &buf, ioparam, -1);
|
||||||
box->value_null = value_null;
|
box->value_null = value_null;
|
||||||
|
@ -201,6 +228,7 @@ stypebox_deserialize(PG_FUNCTION_ARGS)
|
||||||
PG_RETURN_POINTER(box);
|
PG_RETURN_POINTER(box);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (box1, box2) -> box
|
* (box1, box2) -> box
|
||||||
* box1.value = box.agg.combine(box1.value, box2.value)
|
* box1.value = box.agg.combine(box1.value, box2.value)
|
||||||
|
@ -211,19 +239,20 @@ stypebox_combine(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
StypeBox *box1 = NULL;
|
StypeBox *box1 = NULL;
|
||||||
StypeBox *box2 = NULL;
|
StypeBox *box2 = NULL;
|
||||||
FunctionCallInfo inner_fcinfo;
|
FunctionCallInfoData inner_fcinfodata;
|
||||||
|
FunctionCallInfo inner_fcinfo = &inner_fcinfodata;
|
||||||
FmgrInfo info;
|
FmgrInfo info;
|
||||||
Oid aggOid;
|
Oid combine;
|
||||||
HeapTuple *aggtuple;
|
HeapTuple aggtuple;
|
||||||
Form_pg_aggregate aggform;
|
Form_pg_aggregate aggform;
|
||||||
|
|
||||||
if (!PG_ISARGNULL(0))
|
if (!PG_ARGISNULL(0))
|
||||||
{
|
{
|
||||||
box1 = PG_GETARG_POINTER(0);
|
box1 = (StypeBox *) PG_GETARG_POINTER(0);
|
||||||
}
|
}
|
||||||
if (!PG_ISARGNULL(1))
|
if (!PG_ARGISNULL(1))
|
||||||
{
|
{
|
||||||
box2 = PG_GETARG_POINTER(1);
|
box2 = (StypeBox *) PG_GETARG_POINTER(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (box1 == NULL)
|
if (box1 == NULL)
|
||||||
|
@ -239,8 +268,8 @@ stypebox_combine(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
aggtuple = get_aggform(box1->agg, &aggform);
|
aggtuple = get_aggform(box1->agg, &aggform);
|
||||||
Assert(IsValidOid(aggform->combineefn));
|
Assert(aggform->combineefn != InvalidOid);
|
||||||
combine = aggform->combinefn;
|
combine = aggform->aggcombinefn;
|
||||||
ReleaseSysCache(aggtuple);
|
ReleaseSysCache(aggtuple);
|
||||||
|
|
||||||
fmgr_info(combine, &info);
|
fmgr_info(combine, &info);
|
||||||
|
@ -261,18 +290,20 @@ stypebox_combine(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
InitFunctionCallInfoData(&inner_fcinfo, &info, 2, fcinfo->collation, fcinfo->context, fcinfo->resultinfo);
|
InitFunctionCallInfoData(*inner_fcinfo, &info, 2, fcinfo->fncollation,
|
||||||
inner_fcinfo.arg[0] = box1->value;
|
fcinfo->context, fcinfo->resultinfo);
|
||||||
inner_fcinfo.argnull[0] = box1->value_null;
|
inner_fcinfo->arg[0] = box1->value;
|
||||||
inner_fcinfo.arg[1] = box2->value;
|
inner_fcinfo->argnull[0] = box1->value_null;
|
||||||
inner_fcinfo.argnull[1] = box2->value_null;
|
inner_fcinfo->arg[1] = box2->value;
|
||||||
// TODO Deal with memory management juggling (see executor/nodeAgg)
|
inner_fcinfo->argnull[1] = box2->value_null;
|
||||||
box1->value = FunctionCallInvoke(&inner_fcinfo);
|
/* TODO Deal with memory management juggling (see executor/nodeAgg) */
|
||||||
box1->value_null = inner_fcinfo.isnull;
|
box1->value = FunctionCallInvoke(inner_fcinfo);
|
||||||
|
box1->value_null = inner_fcinfo->isnull;
|
||||||
|
|
||||||
PG_RETURN_POINTER(box1);
|
PG_RETURN_POINTER(box1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (box, agg, ...) -> box
|
* (box, agg, ...) -> box
|
||||||
* box.agg = agg;
|
* box.agg = agg;
|
||||||
|
@ -283,10 +314,12 @@ Datum
|
||||||
worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
|
worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
StypeBox *box;
|
StypeBox *box;
|
||||||
FunctionCallInfo inner_fcinfo;
|
FunctionCallInfoData inner_fcinfodata;
|
||||||
|
FunctionCallInfo inner_fcinfo = &inner_fcinfodata;
|
||||||
FmgrInfo info;
|
FmgrInfo info;
|
||||||
int i;
|
int i;
|
||||||
if (PG_ARGISNULL(0)) {
|
if (PG_ARGISNULL(0))
|
||||||
|
{
|
||||||
box = palloc(sizeof(StypeBox));
|
box = palloc(sizeof(StypeBox));
|
||||||
box->agg = PG_GETARG_OID(1);
|
box->agg = PG_GETARG_OID(1);
|
||||||
box->value = (Datum) 0;
|
box->value = (Datum) 0;
|
||||||
|
@ -294,12 +327,14 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
box = PG_GETARG_POINTER(0);
|
box = (StypeBox *) PG_GETARG_POINTER(0);
|
||||||
Assert(box->agg == PG_GETARG_OID(1));
|
Assert(box->agg == PG_GETARG_OID(1));
|
||||||
}
|
}
|
||||||
fmgr_info(box->agg, &info);
|
fmgr_info(box->agg, &info);
|
||||||
InitFunctionCallInfoData(&inner_fcinfo, &info, fcinfo->nargs - 1, fcinfo->collation, fcinfo->context, fcinfo->resultinfo);
|
InitFunctionCallInfoData(*inner_fcinfo, &info, fcinfo->nargs - 1, fcinfo->fncollation,
|
||||||
if (info.flinfo->fn_strict) {
|
fcinfo->context, fcinfo->resultinfo);
|
||||||
|
if (info.fn_strict)
|
||||||
|
{
|
||||||
if (box->value_null)
|
if (box->value_null)
|
||||||
{
|
{
|
||||||
PG_RETURN_NULL();
|
PG_RETURN_NULL();
|
||||||
|
@ -312,16 +347,19 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Deal with memory management juggling (see executor/nodeAgg)
|
/* Deal with memory management juggling (see executor/nodeAgg) */
|
||||||
inner_fcinfo.arg[0] = box->value;
|
inner_fcinfo->arg[0] = box->value;
|
||||||
inner_fcinfo.argnull[0] = box->value_null;
|
inner_fcinfo->argnull[0] = box->value_null;
|
||||||
memcpy(&inner_fcinfo.arg[1], &fcinfo.arg[2], sizeof(Datum) * (inner_fcinfo.nargs - 1));
|
memcpy(&inner_fcinfo->arg[1], &fcinfo->arg[2], sizeof(Datum) * (inner_fcinfo->nargs -
|
||||||
memcpy(&inner_fcinfo.argnull[1], &fcinfo.argnull[2], sizeof(bool) * (inner_fcinfo.nargs - 1));
|
1));
|
||||||
box->value = FunctionCallInvoke(&inner_fcinfo);
|
memcpy(&inner_fcinfo->argnull[1], &fcinfo->argnull[2], sizeof(bool) *
|
||||||
box->value_null = inner_fcinfo.isnull;
|
(inner_fcinfo->nargs - 1));
|
||||||
|
box->value = FunctionCallInvoke(inner_fcinfo);
|
||||||
|
box->value_null = inner_fcinfo->isnull;
|
||||||
PG_RETURN_POINTER(box);
|
PG_RETURN_POINTER(box);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (box) -> box.agg.stype
|
* (box) -> box.agg.stype
|
||||||
* return box.agg.serialize(box.value)
|
* return box.agg.serialize(box.value)
|
||||||
|
@ -329,30 +367,32 @@ worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
|
||||||
Datum
|
Datum
|
||||||
worker_partial_agg_ffunc(PG_FUNCTION_ARGS)
|
worker_partial_agg_ffunc(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
FunctionCallInfo inner_fcinfo;
|
FunctionCallInfoData inner_fcinfodata;
|
||||||
|
FunctionCallInfo inner_fcinfo = &inner_fcinfodata;
|
||||||
FmgrInfo info;
|
FmgrInfo info;
|
||||||
StypeBox *box = PG_GETARG_POINTER(0);
|
StypeBox *box = (StypeBox *) PG_GETARG_POINTER(0);
|
||||||
HeapTuple *aggtuple;
|
HeapTuple aggtuple;
|
||||||
Form_pg_aggregate aggform;
|
Form_pg_aggregate aggform;
|
||||||
Oid serial;
|
Oid serial;
|
||||||
Datum result;
|
Datum result;
|
||||||
|
|
||||||
aggtuple = get_aggform(box->agg);
|
aggtuple = get_aggform(box->agg, &aggform);
|
||||||
serial = aggform->serialfunc;
|
serial = aggform->aggserialfn;
|
||||||
ReleaseSysCache(aggtuple);
|
ReleaseSysCache(aggtuple);
|
||||||
|
|
||||||
if (IsValidOid(serial))
|
if (serial != InvalidOid)
|
||||||
{
|
{
|
||||||
fmgr_info(serial, &info);
|
fmgr_info(serial, &info);
|
||||||
if (info->fn_strict && box->value_null)
|
if (info.fn_strict && box->value_null)
|
||||||
{
|
{
|
||||||
PG_RETURN_NULL();
|
PG_RETURN_NULL();
|
||||||
}
|
}
|
||||||
InitFunctionCallInfoData(&inner_fcinfo, &info, 1, fcinfo->collation, fcinfo->context, fcinfo->resultinfo);
|
InitFunctionCallInfoData(*inner_fcinfo, &info, 1, fcinfo->fncollation,
|
||||||
inner_fcinfo.arg[0] = box->value;
|
fcinfo->context, fcinfo->resultinfo);
|
||||||
inner_fcinfo.argnull[0] = box->value_null;
|
inner_fcinfo->arg[0] = box->value;
|
||||||
result = FunctionCallInvoke(&inner_fcinfo);
|
inner_fcinfo->argnull[0] = box->value_null;
|
||||||
if (inner_fcinfo.isnull)
|
result = FunctionCallInvoke(inner_fcinfo);
|
||||||
|
if (inner_fcinfo->isnull)
|
||||||
{
|
{
|
||||||
PG_RETURN_NULL();
|
PG_RETURN_NULL();
|
||||||
}
|
}
|
||||||
|
@ -368,6 +408,7 @@ worker_partial_agg_ffunc(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (box, agg, valbytes|value) -> box
|
* (box, agg, valbytes|value) -> box
|
||||||
* box->agg = agg
|
* box->agg = agg
|
||||||
|
@ -378,18 +419,19 @@ worker_partial_agg_ffunc(PG_FUNCTION_ARGS)
|
||||||
Datum
|
Datum
|
||||||
coord_combine_agg_sfunc(PG_FUNCTION_ARGS)
|
coord_combine_agg_sfunc(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
FunctionCallInfo inner_fcinfo;
|
FunctionCallInfoData inner_fcinfodata;
|
||||||
|
FunctionCallInfo inner_fcinfo = &inner_fcinfodata;
|
||||||
FmgrInfo info;
|
FmgrInfo info;
|
||||||
HeapTuple *aggtuple;
|
HeapTuple aggtuple;
|
||||||
HeapTuple *transtypetuple;
|
|
||||||
Form_pg_aggregate aggform;
|
Form_pg_aggregate aggform;
|
||||||
Form_pg_type transtypeform;
|
|
||||||
Oid combine;
|
Oid combine;
|
||||||
Oid deserial;
|
Oid deserial;
|
||||||
Datum value;
|
Datum value;
|
||||||
bool value_null;
|
bool value_null;
|
||||||
|
StypeBox *box;
|
||||||
|
|
||||||
if (PG_ARGISNULL(0)) {
|
if (PG_ARGISNULL(0))
|
||||||
|
{
|
||||||
box = palloc(sizeof(StypeBox));
|
box = palloc(sizeof(StypeBox));
|
||||||
box->agg = PG_GETARG_OID(1);
|
box->agg = PG_GETARG_OID(1);
|
||||||
box->value = (Datum) 0;
|
box->value = (Datum) 0;
|
||||||
|
@ -397,28 +439,29 @@ coord_combine_agg_sfunc(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
box = PG_GETARG_POINTER(0);
|
box = (StypeBox *) PG_GETARG_POINTER(0);
|
||||||
Assert(box->agg == PG_GETARG_OID(1));
|
Assert(box->agg == PG_GETARG_OID(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
aggtuple = get_aggform(box->agg, &aggform);
|
aggtuple = get_aggform(box->agg, &aggform);
|
||||||
deserial = aggform->deserialfunc;
|
deserial = aggform->aggdeserialfn;
|
||||||
combine = aggform->combinefn;
|
combine = aggform->aggcombinefn;
|
||||||
ReleaseSysCache(aggtuple);
|
ReleaseSysCache(aggtuple);
|
||||||
|
|
||||||
value_null = PG_ISARGNULL(2);
|
value_null = PG_ARGISNULL(2);
|
||||||
if (IsOidValid(deserial))
|
if (deserial != InvalidOid)
|
||||||
{
|
{
|
||||||
fmgr_info(deserial, &info);
|
fmgr_info(deserial, &info);
|
||||||
if (!value_null || !info->fn_strict)
|
if (!value_null || !info.fn_strict)
|
||||||
{
|
{
|
||||||
InitFunctionCallInfoData(&inner_fcinfo, &info, 2, fcinfo->collation, fcinfo->context, fcinfo->resultinfo);
|
InitFunctionCallInfoData(*inner_fcinfo, &info, 2, fcinfo->fncollation,
|
||||||
inner_fcinfo.arg[0] = value_null ? 0 (Datum) : PG_GETARG_BYTEA_PP(2);
|
fcinfo->context, fcinfo->resultinfo);
|
||||||
inner_fcinfo.arg[1] = 0 (Datum);
|
inner_fcinfo->arg[0] = value_null ? (Datum) 0 : PG_GETARG_DATUM(2);
|
||||||
inner_fcinfo.argnull[0] = value_null;
|
inner_fcinfo->arg[1] = (Datum) 0;
|
||||||
inner_fcinfo.argnull[1] = true;
|
inner_fcinfo->argnull[0] = value_null;
|
||||||
value = FunctionCallInvoke(&inner_fcinfo);
|
inner_fcinfo->argnull[1] = true;
|
||||||
value_null = inner_fcinfo.isnull;
|
value = FunctionCallInvoke(inner_fcinfo);
|
||||||
|
value_null = inner_fcinfo->isnull;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -427,12 +470,13 @@ coord_combine_agg_sfunc(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
value = value_null ? (Datum) 0 : PG_GETARG(2);
|
value = value_null ? (Datum) 0 : PG_GETARG_DATUM(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
fmgr_info(combine, &info);
|
fmgr_info(combine, &info);
|
||||||
|
|
||||||
if (info.fn_strict) {
|
if (info.fn_strict)
|
||||||
|
{
|
||||||
if (box->value_null)
|
if (box->value_null)
|
||||||
{
|
{
|
||||||
if (value_null)
|
if (value_null)
|
||||||
|
@ -449,18 +493,19 @@ coord_combine_agg_sfunc(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
InitFunctionCallInfoData(&innerfcinfo, &info, 2, fcinfo->collation, fcinfo->context, fcinfo->resultinfo);
|
InitFunctionCallInfoData(*inner_fcinfo, &info, 2, fcinfo->fncollation,
|
||||||
inner_fcinfo.arg[0] = box->value;
|
fcinfo->context, fcinfo->resultinfo);
|
||||||
inner_fcinfo.argnull[0] = box->value_null;
|
inner_fcinfo->arg[0] = box->value;
|
||||||
inner_fcinfo.arg[1] = value;
|
inner_fcinfo->argnull[0] = box->value_null;
|
||||||
inner_fcinfo.argnull[1] = value_null;
|
inner_fcinfo->arg[1] = value;
|
||||||
box->value = DirectFunctionCall2(combine, box->value, value);
|
inner_fcinfo->argnull[1] = value_null;
|
||||||
box->value = FunctionCallInvoke(&inner_fcinfo);
|
box->value = FunctionCallInvoke(inner_fcinfo);
|
||||||
box->value_null = inner_fcinfo.isnull;
|
box->value_null = inner_fcinfo->isnull;
|
||||||
|
|
||||||
PG_RETURN_POINTER(box);
|
PG_RETURN_POINTER(box);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (box, ...) -> fval
|
* (box, ...) -> fval
|
||||||
* return box.agg.ffunc(box.value)
|
* return box.agg.ffunc(box.value)
|
||||||
|
@ -469,12 +514,13 @@ Datum
|
||||||
coord_combine_agg_ffunc(PG_FUNCTION_ARGS)
|
coord_combine_agg_ffunc(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
Datum ret;
|
Datum ret;
|
||||||
StypeBox *box = PG_GETARG_POINTER(0);
|
StypeBox *box = (StypeBox *) PG_GETARG_POINTER(0);
|
||||||
FunctionCallInfo inner_fcinfo;
|
FunctionCallInfoData inner_fcinfodata;
|
||||||
|
FunctionCallInfo inner_fcinfo = &inner_fcinfodata;
|
||||||
FmgrInfo info;
|
FmgrInfo info;
|
||||||
int inner_nargs;
|
int inner_nargs;
|
||||||
HeapTuple *aggtuple;
|
HeapTuple aggtuple;
|
||||||
HeapTuple *ffunctuple;
|
HeapTuple ffunctuple;
|
||||||
Form_pg_aggregate aggform;
|
Form_pg_aggregate aggform;
|
||||||
Form_pg_proc ffuncform;
|
Form_pg_proc ffuncform;
|
||||||
Oid ffunc;
|
Oid ffunc;
|
||||||
|
@ -487,7 +533,7 @@ coord_combine_agg_ffunc(PG_FUNCTION_ARGS)
|
||||||
fextra = aggform->aggfinalextra;
|
fextra = aggform->aggfinalextra;
|
||||||
ReleaseSysCache(aggtuple);
|
ReleaseSysCache(aggtuple);
|
||||||
|
|
||||||
if (!IsValidOid(ffunc))
|
if (ffunc == InvalidOid)
|
||||||
{
|
{
|
||||||
if (box->value_null)
|
if (box->value_null)
|
||||||
{
|
{
|
||||||
|
@ -515,15 +561,15 @@ coord_combine_agg_ffunc(PG_FUNCTION_ARGS)
|
||||||
inner_nargs = 1;
|
inner_nargs = 1;
|
||||||
}
|
}
|
||||||
fmgr_info(ffunc, &info);
|
fmgr_info(ffunc, &info);
|
||||||
InitFunctionCallInfoData(&inner_fcinfo, &info, inner_nargs, fcinfo->collation, fcinfo->context, fcinfo->resultinfo);
|
InitFunctionCallInfoData(*inner_fcinfo, &info, inner_nargs, fcinfo->fncollation,
|
||||||
inner_fcinfo.arg[0] = box->value;
|
fcinfo->context, fcinfo->resultinfo);
|
||||||
inner_fcinfo.argnull[0] = box->value_null;
|
inner_fcinfo->arg[0] = box->value;
|
||||||
|
inner_fcinfo->argnull[0] = box->value_null;
|
||||||
for (i = 1; i < inner_nargs; i++)
|
for (i = 1; i < inner_nargs; i++)
|
||||||
{
|
{
|
||||||
inner_fcinfo.argnull[i] = true;
|
inner_fcinfo->argnull[i] = true;
|
||||||
}
|
}
|
||||||
ret = FunctionCallInvoke(&inner_fcinfo);
|
ret = FunctionCallInvoke(inner_fcinfo);
|
||||||
fcinfo.isnull = inner_fcinfo.isnull;
|
fcinfo->isnull = inner_fcinfo->isnull;
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue