Phase 1 implementation of custom aggregates

Phase 1 seeks to implement minimal infrastructure, so does not include:
	- dynamic generation of support aggregates to handle multiple arguments
	- configuration methods to direct aggregation strategy,
		or mark an aggregate's serialize/deserialize as safe to operate across nodes

Aggregates can be distributed when:
	- they have a single argument
	- they have a combinefunc
	- their transition type is not a pseudotype
pull/3132/head^2
Philip Dubé 2019-06-13 11:24:40 -07:00 committed by Philip Dubé
parent edc7a2ee38
commit 495c0f5117
11 changed files with 1239 additions and 34 deletions

View File

@ -252,12 +252,16 @@ static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
WorkerAggregateWalkerContext *walkerContextry);
static AggregateType GetAggregateType(Oid aggFunctionId);
static Oid AggregateArgumentType(Aggref *aggregate);
static bool AggregateEnabledCustom(Oid aggregateOid);
static Oid CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *argtypes);
static Oid WorkerPartialAggOid(void);
static Oid CoordCombineAggOid(void);
static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
static Oid TypeOid(Oid schemaId, const char *typeName);
static SortGroupClause * CreateSortGroupClause(Var *column);
/* Local functions forward declarations for count(distinct) approximations */
static char * CountDistinctHashFunctionName(Oid argumentType);
static const char * CountDistinctHashFunctionName(Oid argumentType);
static int CountDistinctStorageSize(double approximationErrorRate);
static Const * MakeIntegerConst(int32 integerValue);
static Const * MakeIntegerConstInt64(int64 integerValue);
@ -1565,7 +1569,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
{
/*
* If enabled, we check for count(distinct) approximations before count
* distincts. For this, we first compute hll_add_agg(hll_hash(column) on
* distincts. For this, we first compute hll_add_agg(hll_hash(column)) on
* worker nodes, and get hll values. We then gather hlls on the master
* node, and compute hll_cardinality(hll_union_agg(hll)).
*/
@ -1842,6 +1846,72 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterExpression = (Expr *) unionAggregate;
}
else if (aggregateType == AGGREGATE_CUSTOM)
{
HeapTuple aggTuple = SearchSysCache1(AGGFNOID,
ObjectIdGetDatum(
originalAggregate->aggfnoid));
Form_pg_aggregate aggform;
Oid combine;
if (!HeapTupleIsValid(aggTuple))
{
elog(ERROR, "citus cache lookup failed for aggregate %u",
originalAggregate->aggfnoid);
return NULL;
}
else
{
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
combine = aggform->aggcombinefn;
ReleaseSysCache(aggTuple);
}
if (combine != InvalidOid)
{
Const *aggOidParam = NULL;
Var *column = NULL;
Const *nullTag = NULL;
List *aggArguments = NIL;
Aggref *newMasterAggregate = NULL;
Oid coordCombineId = CoordCombineAggOid();
Oid workerReturnType = CSTRINGOID;
int32 workerReturnTypeMod = -1;
Oid workerCollationId = InvalidOid;
Oid resultType = exprType((Node *) originalAggregate);
aggOidParam = makeConst(OIDOID, -1, InvalidOid, sizeof(Oid),
ObjectIdGetDatum(originalAggregate->aggfnoid),
false, true);
column = makeVar(masterTableId, walkerContext->columnId, workerReturnType,
workerReturnTypeMod, workerCollationId, columnLevelsUp);
walkerContext->columnId++;
nullTag = makeNullConst(resultType, -1, InvalidOid);
aggArguments = list_make3(makeTargetEntry((Expr *) aggOidParam, 1, NULL,
false),
makeTargetEntry((Expr *) column, 2, NULL, false),
makeTargetEntry((Expr *) nullTag, 3, NULL, false));
/* coord_combine_agg(agg, workercol) */
newMasterAggregate = makeNode(Aggref);
newMasterAggregate->aggfnoid = coordCombineId;
newMasterAggregate->aggtype = originalAggregate->aggtype;
newMasterAggregate->args = aggArguments;
newMasterAggregate->aggkind = AGGKIND_NORMAL;
newMasterAggregate->aggfilter = originalAggregate->aggfilter;
newMasterAggregate->aggtranstype = INTERNALOID;
newMasterAggregate->aggargtypes = list_make3_oid(OIDOID, CSTRINGOID,
resultType);
newMasterAggregate->aggsplit = AGGSPLIT_SIMPLE;
newMasterExpression = (Expr *) newMasterAggregate;
}
else
{
elog(ERROR, "Aggregate lacks COMBINEFUNC");
}
}
else
{
/*
@ -1887,6 +1957,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterExpression = (Expr *) newMasterAggregate;
}
/*
* Aggregate functions could have changed the return type. If so, we wrap
* the new expression with a conversion function to make it have the same
@ -2798,7 +2869,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
Oid hllSchemaOid = get_extension_schema(hllId);
const char *hllSchemaName = get_namespace_name(hllSchemaOid);
char *hashFunctionName = CountDistinctHashFunctionName(argumentType);
const char *hashFunctionName = CountDistinctHashFunctionName(argumentType);
Oid hashFunctionId = FunctionOid(hllSchemaName, hashFunctionName,
hashArgumentCount);
Oid hashFunctionReturnType = get_func_rettype(hashFunctionId);
@ -2870,6 +2941,67 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
workerAggregateList = lappend(workerAggregateList, sumAggregate);
workerAggregateList = lappend(workerAggregateList, countAggregate);
}
else if (aggregateType == AGGREGATE_CUSTOM)
{
HeapTuple aggTuple = SearchSysCache1(AGGFNOID,
ObjectIdGetDatum(
originalAggregate->aggfnoid));
Form_pg_aggregate aggform;
Oid combine;
if (!HeapTupleIsValid(aggTuple))
{
elog(ERROR, "citus cache lookup failed for aggregate %u",
originalAggregate->aggfnoid);
return NULL;
}
else
{
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
combine = aggform->aggcombinefn;
ReleaseSysCache(aggTuple);
}
if (combine != InvalidOid)
{
Const *aggOidParam = NULL;
Aggref *newWorkerAggregate = NULL;
List *aggArguments = NIL;
ListCell *originalAggArgCell;
Oid workerPartialId = WorkerPartialAggOid();
aggOidParam = makeConst(REGPROCEDUREOID, -1, InvalidOid, sizeof(Oid),
ObjectIdGetDatum(originalAggregate->aggfnoid), false,
true);
aggArguments = list_make1(makeTargetEntry((Expr *) aggOidParam, 1, NULL,
false));
foreach(originalAggArgCell, originalAggregate->args)
{
TargetEntry *arg = lfirst(originalAggArgCell);
TargetEntry *newArg = copyObject(arg);
newArg->resno++;
aggArguments = lappend(aggArguments, newArg);
}
/* worker_partial_agg(agg, ...args) */
newWorkerAggregate = makeNode(Aggref);
newWorkerAggregate->aggfnoid = workerPartialId;
newWorkerAggregate->aggtype = CSTRINGOID;
newWorkerAggregate->args = aggArguments;
newWorkerAggregate->aggkind = AGGKIND_NORMAL;
newWorkerAggregate->aggfilter = originalAggregate->aggfilter;
newWorkerAggregate->aggtranstype = INTERNALOID;
newWorkerAggregate->aggargtypes = lcons_oid(OIDOID,
originalAggregate->aggargtypes);
newWorkerAggregate->aggsplit = AGGSPLIT_SIMPLE;
workerAggregateList = list_make1(newWorkerAggregate);
}
else
{
elog(ERROR, "Aggregate lacks COMBINEFUNC");
}
}
else
{
/*
@ -2907,12 +3039,14 @@ GetAggregateType(Oid aggFunctionId)
aggregateProcName = get_func_name(aggFunctionId);
if (aggregateProcName == NULL)
{
ereport(ERROR, (errmsg("cache lookup failed for function %u", aggFunctionId)));
ereport(ERROR, (errmsg("citus cache lookup failed for function %u",
aggFunctionId)));
}
aggregateCount = lengthof(AggregateNames);
Assert(AGGREGATE_INVALID_FIRST == 0);
for (aggregateIndex = 1; aggregateIndex < aggregateCount; aggregateIndex++)
{
const char *aggregateName = AggregateNames[aggregateIndex];
@ -2925,6 +3059,11 @@ GetAggregateType(Oid aggFunctionId)
if (!found)
{
if (AggregateEnabledCustom(aggFunctionId))
{
return AGGREGATE_CUSTOM;
}
ereport(ERROR, (errmsg("unsupported aggregate function %s", aggregateProcName)));
}
@ -2947,6 +3086,48 @@ AggregateArgumentType(Aggref *aggregate)
}
/*
* AggregateEnabledCustom returns whether given aggregate can be
* distributed across workers using worker_partial_agg & coord_combine_agg.
*/
static bool
AggregateEnabledCustom(Oid aggregateOid)
{
HeapTuple aggTuple;
Form_pg_aggregate aggform;
HeapTuple typeTuple;
Form_pg_type typeform;
bool supportsSafeCombine;
aggTuple = SearchSysCache1(AGGFNOID, aggregateOid);
if (!HeapTupleIsValid(aggTuple))
{
elog(ERROR, "citus cache lookup failed.");
}
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
if (aggform->aggcombinefn == InvalidOid)
{
ReleaseSysCache(aggTuple);
return false;
}
typeTuple = SearchSysCache1(TYPEOID, aggform->aggtranstype);
if (!HeapTupleIsValid(typeTuple))
{
elog(ERROR, "citus cache lookup failed.");
}
typeform = (Form_pg_type) GETSTRUCT(typeTuple);
supportsSafeCombine = typeform->typtype != TYPTYPE_PSEUDO;
ReleaseSysCache(aggTuple);
ReleaseSysCache(typeTuple);
return supportsSafeCombine;
}
/*
* AggregateFunctionOid performs a reverse lookup on aggregate function name,
* and returns the corresponding aggregate function oid for the given function
@ -3009,6 +3190,62 @@ AggregateFunctionOid(const char *functionName, Oid inputType)
}
/*
* AggregateFunctionOidWithoutInput performs a reverse lookup on aggregate function name,
* and returns the corresponding aggregate function oid for the given function
* name and input type.
*/
static Oid
CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *argtypes)
{
List *aggregateName = list_make2(makeString("citus"), makeString(functionName));
FuncCandidateList clist = FuncnameGetCandidates(aggregateName, numargs, NIL, false,
false, true);
for (; clist; clist = clist->next)
{
if (memcmp(clist->args, argtypes, numargs * sizeof(Oid)) == 0)
{
return clist->oid;
}
}
ereport(ERROR, (errmsg("no matching oid for function: %s", functionName)));
return InvalidOid;
}
/*
* Lookup oid of citus.worker_partial_agg
*/
static Oid
WorkerPartialAggOid()
{
Oid argtypes[] = {
OIDOID,
ANYELEMENTOID,
};
return CitusFunctionOidWithSignature(WORKER_PARTIAL_AGGREGATE_NAME, 2, argtypes);
}
/*
* Lookup oid of citus.coord_combine_agg
*/
static Oid
CoordCombineAggOid()
{
Oid argtypes[] = {
OIDOID,
CSTRINGOID,
ANYELEMENTOID,
};
return CitusFunctionOidWithSignature(COORD_COMBINE_AGGREGATE_NAME, 3, argtypes);
}
/*
* TypeOid looks for a type that has the given name and schema, and returns the
* corresponding type's oid.
@ -3018,8 +3255,8 @@ TypeOid(Oid schemaId, const char *typeName)
{
Oid typeOid;
typeOid = GetSysCacheOid2Compat(TYPENAMENSP, Anum_pg_type_oid, PointerGetDatum(
typeName),
typeOid = GetSysCacheOid2Compat(TYPENAMENSP, Anum_pg_type_oid,
PointerGetDatum(typeName),
ObjectIdGetDatum(schemaId));
return typeOid;
@ -3055,42 +3292,34 @@ CreateSortGroupClause(Var *column)
* CountDistinctHashFunctionName resolves the hll_hash function name to use for
* the given input type, and returns this function name.
*/
static char *
static const char *
CountDistinctHashFunctionName(Oid argumentType)
{
char *hashFunctionName = NULL;
/* resolve hash function name based on input argument type */
switch (argumentType)
{
case INT4OID:
{
hashFunctionName = pstrdup(HLL_HASH_INTEGER_FUNC_NAME);
break;
return HLL_HASH_INTEGER_FUNC_NAME;
}
case INT8OID:
{
hashFunctionName = pstrdup(HLL_HASH_BIGINT_FUNC_NAME);
break;
return HLL_HASH_BIGINT_FUNC_NAME;
}
case TEXTOID:
case BPCHAROID:
case VARCHAROID:
{
hashFunctionName = pstrdup(HLL_HASH_TEXT_FUNC_NAME);
break;
return HLL_HASH_TEXT_FUNC_NAME;
}
default:
{
hashFunctionName = pstrdup(HLL_HASH_ANY_FUNC_NAME);
break;
return HLL_HASH_ANY_FUNC_NAME;
}
}
return hashFunctionName;
}

View File

@ -14,3 +14,56 @@ UPDATE pg_dist_colocation SET replicationfactor = -1 WHERE distributioncolumntyp
-- drop function which was used for upgrading from 6.0
-- creation was removed from citus--7.0-1.sql
DROP FUNCTION IF EXISTS pg_catalog.master_initialize_node_metadata;
-- Support infrastructure for distributing aggregation
CREATE FUNCTION citus.worker_partial_agg_sfunc(internal, oid, anyelement)
RETURNS internal
AS 'MODULE_PATHNAME'
LANGUAGE C PARALLEL SAFE;
COMMENT ON FUNCTION citus.worker_partial_agg_sfunc(internal, oid, anyelement)
IS 'transition function for worker_partial_agg';
CREATE FUNCTION citus.worker_partial_agg_ffunc(internal)
RETURNS cstring
AS 'MODULE_PATHNAME'
LANGUAGE C PARALLEL SAFE;
COMMENT ON FUNCTION citus.worker_partial_agg_ffunc(internal)
IS 'finalizer for worker_partial_agg';
CREATE FUNCTION citus.coord_combine_agg_sfunc(internal, oid, cstring, anyelement)
RETURNS internal
AS 'MODULE_PATHNAME'
LANGUAGE C PARALLEL SAFE;
COMMENT ON FUNCTION citus.coord_combine_agg_sfunc(internal, oid, cstring, anyelement)
IS 'transition function for coord_combine_agg';
CREATE FUNCTION citus.coord_combine_agg_ffunc(internal, oid, cstring, anyelement)
RETURNS anyelement
AS 'MODULE_PATHNAME'
LANGUAGE C PARALLEL SAFE;
COMMENT ON FUNCTION citus.coord_combine_agg_ffunc(internal, oid, cstring, anyelement)
IS 'finalizer for coord_combine_agg';
-- select worker_partial_agg(agg, ...)
-- equivalent to
-- select to_cstring(agg_without_ffunc(...))
CREATE AGGREGATE citus.worker_partial_agg(oid, anyelement) (
STYPE = internal,
SFUNC = citus.worker_partial_agg_sfunc,
FINALFUNC = citus.worker_partial_agg_ffunc
);
COMMENT ON AGGREGATE citus.worker_partial_agg(oid, anyelement)
IS 'support aggregate for implementing partial aggregation on workers';
-- select coord_combine_agg(agg, col)
-- equivalent to
-- select agg_ffunc(agg_combine(from_cstring(col)))
CREATE AGGREGATE citus.coord_combine_agg(oid, cstring, anyelement) (
STYPE = internal,
SFUNC = citus.coord_combine_agg_sfunc,
FINALFUNC = citus.coord_combine_agg_ffunc,
FINALFUNC_EXTRA
);
COMMENT ON AGGREGATE citus.coord_combine_agg(oid, cstring, anyelement)
IS 'support aggregate for implementing combining partial aggregate results from workers';

View File

@ -0,0 +1,620 @@
/*-------------------------------------------------------------------------
*
* aggregate_utils.c
*
* Implementation of UDFs distributing execution of aggregates across workers.
*
* When an aggregate has a combinefunc, we use worker_partial_agg to skip
* calling finalfunc on workers, instead passing state to coordinator where
* it uses combinefunc in coord_combine_agg & applying finalfunc only at end.
*
* Copyright Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "distributed/version_compat.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "pg_config_manual.h"
PG_FUNCTION_INFO_V1(worker_partial_agg_sfunc);
PG_FUNCTION_INFO_V1(worker_partial_agg_ffunc);
PG_FUNCTION_INFO_V1(coord_combine_agg_sfunc);
PG_FUNCTION_INFO_V1(coord_combine_agg_ffunc);
/*
* internal type for support aggregates to pass transition state alongside
* aggregation bookkeeping
*/
typedef struct StypeBox
{
Datum value;
Oid agg;
Oid transtype;
int16_t transtypeLen;
bool transtypeByVal;
bool valueNull;
bool valueInit;
} StypeBox;
static HeapTuple GetAggregateForm(Oid oid, Form_pg_aggregate *form);
static HeapTuple GetProcForm(Oid oid, Form_pg_proc *form);
static HeapTuple GetTypeForm(Oid oid, Form_pg_type *form);
static void * pallocInAggContext(FunctionCallInfo fcinfo, size_t size);
static void aclcheckAggregate(ObjectType objectType, Oid userOid, Oid funcOid);
static void InitializeStypeBox(FunctionCallInfo fcinfo, StypeBox *box, HeapTuple aggTuple,
Oid transtype);
static void HandleTransition(StypeBox *box, FunctionCallInfo fcinfo,
FunctionCallInfo innerFcinfo);
static void HandleStrictUninit(StypeBox *box, FunctionCallInfo fcinfo, Datum value);
/*
* GetAggregateForm loads corresponding tuple & Form_pg_aggregate for oid
*/
static HeapTuple
GetAggregateForm(Oid oid, Form_pg_aggregate *form)
{
HeapTuple tuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(oid));
if (!HeapTupleIsValid(tuple))
{
elog(ERROR, "citus cache lookup failed for aggregate %u", oid);
}
*form = (Form_pg_aggregate) GETSTRUCT(tuple);
return tuple;
}
/*
* GetProcForm loads corresponding tuple & Form_pg_proc for oid
*/
static HeapTuple
GetProcForm(Oid oid, Form_pg_proc *form)
{
HeapTuple tuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(oid));
if (!HeapTupleIsValid(tuple))
{
elog(ERROR, "citus cache lookup failed for function %u", oid);
}
*form = (Form_pg_proc) GETSTRUCT(tuple);
return tuple;
}
/*
* GetTypeForm loads corresponding tuple & Form_pg_type for oid
*/
static HeapTuple
GetTypeForm(Oid oid, Form_pg_type *form)
{
HeapTuple tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(oid));
if (!HeapTupleIsValid(tuple))
{
elog(ERROR, "citus cache lookup failed for type %u", oid);
}
*form = (Form_pg_type) GETSTRUCT(tuple);
return tuple;
}
/*
* pallocInAggContext calls palloc in fcinfo's aggregate context
*/
static void *
pallocInAggContext(FunctionCallInfo fcinfo, size_t size)
{
MemoryContext aggregateContext;
if (!AggCheckCallContext(fcinfo, &aggregateContext))
{
elog(ERROR, "Aggregate function called without an aggregate context");
}
return MemoryContextAlloc(aggregateContext, size);
}
/*
* aclcheckAggregate verifies that the given user has ACL_EXECUTE to the given proc
*/
static void
aclcheckAggregate(ObjectType objectType, Oid userOid, Oid funcOid)
{
AclResult aclresult;
if (funcOid != InvalidOid)
{
aclresult = pg_proc_aclcheck(funcOid, userOid, ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
{
aclcheck_error(aclresult, objectType, get_func_name(funcOid));
}
}
}
/*
* See GetAggInitVal from pg's nodeAgg.c
*/
static void
InitializeStypeBox(FunctionCallInfo fcinfo, StypeBox *box, HeapTuple aggTuple, Oid
transtype)
{
Datum textInitVal;
Form_pg_aggregate aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
Oid userId = GetUserId();
/* First we make ACL_EXECUTE checks as would be done in nodeAgg.c */
aclcheckAggregate(OBJECT_AGGREGATE, userId, aggform->aggfnoid);
aclcheckAggregate(OBJECT_FUNCTION, userId, aggform->aggfinalfn);
aclcheckAggregate(OBJECT_FUNCTION, userId, aggform->aggtransfn);
aclcheckAggregate(OBJECT_FUNCTION, userId, aggform->aggdeserialfn);
aclcheckAggregate(OBJECT_FUNCTION, userId, aggform->aggserialfn);
aclcheckAggregate(OBJECT_FUNCTION, userId, aggform->aggcombinefn);
textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
Anum_pg_aggregate_agginitval,
&box->valueNull);
box->transtype = transtype;
box->valueInit = !box->valueNull;
if (box->valueNull)
{
box->value = (Datum) 0;
}
else
{
Oid typinput,
typioparam;
char *strInitVal;
MemoryContext aggregateContext;
MemoryContext oldContext;
if (!AggCheckCallContext(fcinfo, &aggregateContext))
{
elog(ERROR, "InitializeStypeBox called from non aggregate context");
}
oldContext = MemoryContextSwitchTo(aggregateContext);
getTypeInputInfo(transtype, &typinput, &typioparam);
strInitVal = TextDatumGetCString(textInitVal);
box->value = OidInputFunctionCall(typinput, strInitVal,
typioparam, -1);
pfree(strInitVal);
MemoryContextSwitchTo(oldContext);
}
}
/*
* HandleTransition copies logic used in nodeAgg's advance_transition_function
* for handling result of transition function.
*/
static void
HandleTransition(StypeBox *box, FunctionCallInfo fcinfo, FunctionCallInfo innerFcinfo)
{
Datum newVal = FunctionCallInvoke(innerFcinfo);
bool newValIsNull = innerFcinfo->isnull;
if (!box->transtypeByVal &&
DatumGetPointer(newVal) != DatumGetPointer(box->value))
{
if (!newValIsNull)
{
MemoryContext aggregateContext;
MemoryContext oldContext;
if (!AggCheckCallContext(fcinfo, &aggregateContext))
{
elog(ERROR,
"HandleTransition called from non aggregate context");
}
oldContext = MemoryContextSwitchTo(aggregateContext);
if (!(DatumIsReadWriteExpandedObject(newVal,
false, box->transtypeLen) &&
MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) ==
CurrentMemoryContext))
{
newVal = datumCopy(newVal, box->transtypeByVal, box->transtypeLen);
}
MemoryContextSwitchTo(oldContext);
}
if (!box->valueNull)
{
if (DatumIsReadWriteExpandedObject(box->value,
false, box->transtypeLen))
{
DeleteExpandedObject(box->value);
}
else
{
pfree(DatumGetPointer(box->value));
}
}
}
box->value = newVal;
box->valueNull = newValIsNull;
}
/*
* HandleStrictUninit handles initialization of state for when
* transition function is strict & state has not yet been initialized.
*/
static void
HandleStrictUninit(StypeBox *box, FunctionCallInfo fcinfo, Datum value)
{
MemoryContext aggregateContext;
MemoryContext oldContext;
if (!AggCheckCallContext(fcinfo, &aggregateContext))
{
elog(ERROR, "HandleStrictUninit called from non aggregate context");
}
oldContext = MemoryContextSwitchTo(aggregateContext);
box->value = datumCopy(value, box->transtypeByVal, box->transtypeLen);
MemoryContextSwitchTo(oldContext);
box->valueNull = false;
box->valueInit = true;
}
/*
* worker_partial_agg_sfunc advances transition state,
* essentially implementing the following pseudocode:
*
* (box, agg, ...) -> box
* box.agg = agg;
* box.value = agg.sfunc(box.value, ...);
* return box
*/
Datum
worker_partial_agg_sfunc(PG_FUNCTION_ARGS)
{
StypeBox *box = NULL;
Form_pg_aggregate aggform;
HeapTuple aggtuple;
Oid aggsfunc;
LOCAL_FCINFO(innerFcinfo, FUNC_MAX_ARGS);
FmgrInfo info;
int argumentIndex = 0;
bool initialCall = PG_ARGISNULL(0);
if (initialCall)
{
box = pallocInAggContext(fcinfo, sizeof(StypeBox));
box->agg = PG_GETARG_OID(1);
}
else
{
box = (StypeBox *) PG_GETARG_POINTER(0);
Assert(box->agg == PG_GETARG_OID(1));
}
aggtuple = GetAggregateForm(box->agg, &aggform);
aggsfunc = aggform->aggtransfn;
if (initialCall)
{
InitializeStypeBox(fcinfo, box, aggtuple, aggform->aggtranstype);
}
ReleaseSysCache(aggtuple);
if (initialCall)
{
get_typlenbyval(box->transtype,
&box->transtypeLen,
&box->transtypeByVal);
}
fmgr_info(aggsfunc, &info);
if (info.fn_strict)
{
for (argumentIndex = 2; argumentIndex < PG_NARGS(); argumentIndex++)
{
if (PG_ARGISNULL(argumentIndex))
{
PG_RETURN_POINTER(box);
}
}
if (!box->valueInit)
{
HandleStrictUninit(box, fcinfo, PG_GETARG_DATUM(2));
PG_RETURN_POINTER(box);
}
if (box->valueNull)
{
PG_RETURN_POINTER(box);
}
}
InitFunctionCallInfoData(*innerFcinfo, &info, fcinfo->nargs - 1, fcinfo->fncollation,
fcinfo->context, fcinfo->resultinfo);
fcSetArgExt(innerFcinfo, 0, box->value, box->valueNull);
for (argumentIndex = 1; argumentIndex < innerFcinfo->nargs; argumentIndex++)
{
fcSetArgExt(innerFcinfo, argumentIndex, fcGetArgValue(fcinfo, argumentIndex + 1),
fcGetArgNull(fcinfo, argumentIndex + 1));
}
HandleTransition(box, fcinfo, innerFcinfo);
PG_RETURN_POINTER(box);
}
/*
* worker_partial_agg_ffunc serializes transition state,
* essentially implementing the following pseudocode:
*
* (box) -> text
* return box.agg.stype.output(box.value)
*/
Datum
worker_partial_agg_ffunc(PG_FUNCTION_ARGS)
{
LOCAL_FCINFO(innerFcinfo, 1);
FmgrInfo info;
StypeBox *box = (StypeBox *) (PG_ARGISNULL(0) ? NULL : PG_GETARG_POINTER(0));
HeapTuple aggtuple;
Form_pg_aggregate aggform;
Oid typoutput = InvalidOid;
bool typIsVarlena = false;
Oid transtype;
Datum result;
if (box == NULL || box->valueNull)
{
PG_RETURN_NULL();
}
aggtuple = GetAggregateForm(box->agg, &aggform);
if (aggform->aggcombinefn == InvalidOid)
{
ereport(ERROR, (errmsg(
"worker_partial_agg_ffunc expects an aggregate with COMBINEFUNC")));
}
if (aggform->aggtranstype == INTERNALOID)
{
ereport(ERROR,
(errmsg(
"worker_partial_agg_ffunc does not support aggregates with INTERNAL transition state")));
}
transtype = aggform->aggtranstype;
ReleaseSysCache(aggtuple);
getTypeOutputInfo(transtype, &typoutput, &typIsVarlena);
fmgr_info(typoutput, &info);
InitFunctionCallInfoData(*innerFcinfo, &info, 1, fcinfo->fncollation,
fcinfo->context, fcinfo->resultinfo);
fcSetArgExt(innerFcinfo, 0, box->value, box->valueNull);
result = FunctionCallInvoke(innerFcinfo);
if (innerFcinfo->isnull)
{
PG_RETURN_NULL();
}
PG_RETURN_DATUM(result);
}
/*
* coord_combine_agg_sfunc deserializes transition state from worker
* & advances transition state using combinefunc,
* essentially implementing the following pseudocode:
*
* (box, agg, text) -> box
* box.agg = agg
* box.value = agg.combine(box.value, agg.stype.input(text))
* return box
*/
Datum
coord_combine_agg_sfunc(PG_FUNCTION_ARGS)
{
LOCAL_FCINFO(innerFcinfo, 3);
FmgrInfo info;
HeapTuple aggtuple;
HeapTuple transtypetuple;
Form_pg_aggregate aggform;
Form_pg_type transtypeform;
Oid combine;
Oid deserial;
Oid ioparam;
Datum value;
bool valueNull;
StypeBox *box = NULL;
if (PG_ARGISNULL(0))
{
box = pallocInAggContext(fcinfo, sizeof(StypeBox));
box->agg = PG_GETARG_OID(1);
}
else
{
box = (StypeBox *) PG_GETARG_POINTER(0);
Assert(box->agg == PG_GETARG_OID(1));
}
aggtuple = GetAggregateForm(box->agg, &aggform);
if (aggform->aggcombinefn == InvalidOid)
{
ereport(ERROR, (errmsg(
"coord_combine_agg_sfunc expects an aggregate with COMBINEFUNC")));
}
if (aggform->aggtranstype == INTERNALOID)
{
ereport(ERROR,
(errmsg(
"coord_combine_agg_sfunc does not support aggregates with INTERNAL transition state")));
}
combine = aggform->aggcombinefn;
if (PG_ARGISNULL(0))
{
InitializeStypeBox(fcinfo, box, aggtuple, aggform->aggtranstype);
}
ReleaseSysCache(aggtuple);
if (PG_ARGISNULL(0))
{
get_typlenbyval(box->transtype,
&box->transtypeLen,
&box->transtypeByVal);
}
valueNull = PG_ARGISNULL(2);
transtypetuple = GetTypeForm(box->transtype, &transtypeform);
ioparam = getTypeIOParam(transtypetuple);
deserial = transtypeform->typinput;
ReleaseSysCache(transtypetuple);
fmgr_info(deserial, &info);
if (valueNull && info.fn_strict)
{
value = (Datum) 0;
}
else
{
InitFunctionCallInfoData(*innerFcinfo, &info, 3, fcinfo->fncollation,
fcinfo->context, fcinfo->resultinfo);
fcSetArgExt(innerFcinfo, 0, PG_GETARG_DATUM(2), valueNull);
fcSetArg(innerFcinfo, 1, ObjectIdGetDatum(ioparam));
fcSetArg(innerFcinfo, 2, Int32GetDatum(-1)); /* typmod */
value = FunctionCallInvoke(innerFcinfo);
valueNull = innerFcinfo->isnull;
}
fmgr_info(combine, &info);
if (info.fn_strict)
{
if (valueNull)
{
PG_RETURN_POINTER(box);
}
if (!box->valueInit)
{
HandleStrictUninit(box, fcinfo, value);
PG_RETURN_POINTER(box);
}
if (box->valueNull)
{
PG_RETURN_POINTER(box);
}
}
InitFunctionCallInfoData(*innerFcinfo, &info, 2, fcinfo->fncollation,
fcinfo->context, fcinfo->resultinfo);
fcSetArgExt(innerFcinfo, 0, box->value, box->valueNull);
fcSetArgExt(innerFcinfo, 1, value, valueNull);
HandleTransition(box, fcinfo, innerFcinfo);
PG_RETURN_POINTER(box);
}
/*
* coord_combine_agg_ffunc applies finalfunc of aggregate to state,
* essentially implementing the following pseudocode:
*
* (box, ...) -> fval
* return box.agg.ffunc(box.value)
*/
Datum
coord_combine_agg_ffunc(PG_FUNCTION_ARGS)
{
Datum result;
StypeBox *box = (StypeBox *) (PG_ARGISNULL(0) ? NULL : PG_GETARG_POINTER(0));
LOCAL_FCINFO(innerFcinfo, FUNC_MAX_ARGS);
FmgrInfo info;
int innerNargs = 0;
HeapTuple aggtuple;
HeapTuple ffunctuple;
Form_pg_aggregate aggform;
Form_pg_proc ffuncform;
Oid ffunc = InvalidOid;
bool fextra = false;
bool finalStrict = false;
int argumentIndex = 0;
if (box == NULL)
{
/*
* Ideally we'd return initval,
* but we don't know which aggregate we're handling here
*/
PG_RETURN_NULL();
}
aggtuple = GetAggregateForm(box->agg, &aggform);
ffunc = aggform->aggfinalfn;
fextra = aggform->aggfinalextra;
ReleaseSysCache(aggtuple);
if (ffunc == InvalidOid)
{
if (box->valueNull)
{
PG_RETURN_NULL();
}
PG_RETURN_DATUM(box->value);
}
ffunctuple = GetProcForm(ffunc, &ffuncform);
finalStrict = ffuncform->proisstrict;
ReleaseSysCache(ffunctuple);
if (finalStrict && box->valueNull)
{
PG_RETURN_NULL();
}
if (fextra)
{
innerNargs = fcinfo->nargs;
}
else
{
innerNargs = 1;
}
fmgr_info(ffunc, &info);
InitFunctionCallInfoData(*innerFcinfo, &info, innerNargs, fcinfo->fncollation,
fcinfo->context, fcinfo->resultinfo);
fcSetArgExt(innerFcinfo, 0, box->value, box->valueNull);
for (argumentIndex = 1; argumentIndex < innerNargs; argumentIndex++)
{
fcSetArgNull(innerFcinfo, argumentIndex);
}
result = FunctionCallInvoke(innerFcinfo);
fcinfo->isnull = innerFcinfo->isnull;
return result;
}

View File

@ -166,6 +166,7 @@ extern Oid DistPlacementRelationId(void);
extern Oid DistNodeRelationId(void);
extern Oid DistLocalGroupIdRelationId(void);
extern Oid DistObjectRelationId(void);
extern Oid DistEnabledCustomAggregatesId(void);
/* index oids */
extern Oid DistNodeNodeIdIndexId(void);

View File

@ -26,6 +26,8 @@
#define ARRAY_CAT_AGGREGATE_NAME "array_cat_agg"
#define JSONB_CAT_AGGREGATE_NAME "jsonb_cat_agg"
#define JSON_CAT_AGGREGATE_NAME "json_cat_agg"
#define WORKER_PARTIAL_AGGREGATE_NAME "worker_partial_agg"
#define COORD_COMBINE_AGGREGATE_NAME "coord_combine_agg"
#define WORKER_COLUMN_FORMAT "worker_column_%d"
/* Definitions related to count(distinct) approximations */
@ -76,9 +78,11 @@ typedef enum
AGGREGATE_HLL_UNION = 17,
AGGREGATE_TOPN_ADD_AGG = 18,
AGGREGATE_TOPN_UNION_AGG = 19,
AGGREGATE_ANY_VALUE = 20
} AggregateType;
AGGREGATE_ANY_VALUE = 20,
/* AGGREGATE_CUSTOM must come last */
AGGREGATE_CUSTOM = 21
} AggregateType;
/*
* PushDownStatus indicates whether a node can be pushed down below its child

View File

@ -33,10 +33,10 @@
#define GetSysCacheOid3Compat GetSysCacheOid3
#define GetSysCacheOid4Compat GetSysCacheOid4
#define fcSetArg(fc, n, argval) \
(((fc)->args[n].isnull = false), ((fc)->args[n].value = (argval)))
#define fcSetArgNull(fc, n) \
(((fc)->args[n].isnull = true), ((fc)->args[n].value = (Datum) 0))
#define fcGetArgValue(fc, n) ((fc)->args[n].value)
#define fcGetArgNull(fc, n) ((fc)->args[n].isnull)
#define fcSetArgExt(fc, n, val, is_null) \
(((fc)->args[n].isnull = (is_null)), ((fc)->args[n].value = (val)))
typedef struct
{
@ -104,10 +104,10 @@ FileCompatFromFileStart(File fileDesc)
FunctionCallInfoData name ## data; \
FunctionCallInfoData *name = &name ## data
#define fcSetArg(fc, n, value) \
(((fc)->argnull[n] = false), ((fc)->arg[n] = (value)))
#define fcSetArgNull(fc, n) \
(((fc)->argnull[n] = true), ((fc)->arg[n] = (Datum) 0))
#define fcGetArgValue(fc, n) ((fc)->arg[n])
#define fcGetArgNull(fc, n) ((fc)->argnull[n])
#define fcSetArgExt(fc, n, val, is_null) \
(((fc)->argnull[n] = (is_null)), ((fc)->arg[n] = (val)))
typedef struct
{
@ -141,4 +141,7 @@ FileCompatFromFileStart(File fileDesc)
#endif /* PG12 */
#define fcSetArg(fc, n, value) fcSetArgExt(fc, n, value, false)
#define fcSetArgNull(fc, n) fcSetArgExt(fc, n, (Datum) 0, true)
#endif /* VERSION_COMPAT_H */

View File

@ -0,0 +1,166 @@
--
-- AGGREGATE SUPPORT
--
-- Tests support for user defined aggregates
create schema aggregate_support;
set search_path to aggregate_support;
-- We test with & without STRICT as our code is responsible for managing these NULL checks
create function sum2_sfunc_strict(state int, x int)
returns int immutable strict language plpgsql as $$
begin return state + x;
end;
$$;
create function sum2_finalfunc_strict(state int)
returns int immutable strict language plpgsql as $$
begin return state * 2;
end;
$$;
create function sum2_sfunc(state int, x int)
returns int immutable language plpgsql as $$
begin return state + x;
end;
$$;
create function sum2_finalfunc(state int)
returns int immutable language plpgsql as $$
begin return state * 2;
end;
$$;
create aggregate sum2 (int) (
sfunc = sum2_sfunc,
stype = int,
finalfunc = sum2_finalfunc,
combinefunc = sum2_sfunc,
initcond = '0'
);
create aggregate sum2_strict (int) (
sfunc = sum2_sfunc_strict,
stype = int,
finalfunc = sum2_finalfunc_strict,
combinefunc = sum2_sfunc_strict
);
select create_distributed_function('sum2(int)');
create_distributed_function
-----------------------------
(1 row)
select create_distributed_function('sum2_strict(int)');
create_distributed_function
-----------------------------
(1 row)
create table aggdata (id int, key int, val int, valf float8);
select create_distributed_table('aggdata', 'id');
create_distributed_table
--------------------------
(1 row)
insert into aggdata (id, key, val, valf) values (1, 1, 2, 11.2), (2, 1, NULL, 2.1), (3, 2, 2, 3.22), (4, 2, 3, 4.23), (5, 2, 5, 5.25), (6, 3, 4, 63.4), (7, 5, NULL, 75), (8, 6, NULL, NULL), (9, 6, NULL, 96), (10, 7, 8, 1078), (11, 9, 0, 1.19);
select key, sum2(val), sum2_strict(val), stddev(valf)
from aggdata group by key order by key;
key | sum2 | sum2_strict | stddev
-----+------+-------------+------------------
1 | | 4 | 6.43467170879758
2 | 20 | 20 | 1.01500410508201
3 | 8 | 8 |
5 | | |
6 | | |
7 | 16 | 16 |
9 | 0 | 0 |
(7 rows)
-- test polymorphic aggregates from https://github.com/citusdata/citus/issues/2397
-- we do not currently support pseudotypes for transition types, so this errors for now
CREATE OR REPLACE FUNCTION first_agg(anyelement, anyelement)
RETURNS anyelement AS $$
SELECT CASE WHEN $1 IS NULL THEN $2 ELSE $1 END;
$$ LANGUAGE SQL STABLE;
CREATE AGGREGATE first (
sfunc = first_agg,
basetype = anyelement,
stype = anyelement,
combinefunc = first_agg
);
CREATE OR REPLACE FUNCTION last_agg(anyelement, anyelement)
RETURNS anyelement AS $$
SELECT $2;
$$ LANGUAGE SQL STABLE;
CREATE AGGREGATE last (
sfunc = last_agg,
basetype = anyelement,
stype = anyelement,
combinefunc = last_agg
);
SELECT create_distributed_function('first(anyelement)');
create_distributed_function
-----------------------------
(1 row)
SELECT create_distributed_function('last(anyelement)');
create_distributed_function
-----------------------------
(1 row)
SELECT key, first(val ORDER BY id), last(val ORDER BY id)
FROM aggdata GROUP BY key ORDER BY key;
ERROR: unsupported aggregate function first
-- test aggregate with stype which is not a by-value datum
-- also test our handling of the aggregate not existing on workers
create function sumstring_sfunc(state text, x text)
returns text immutable language plpgsql as $$
begin return (state::float8 + x::float8)::text;
end;
$$;
create aggregate sumstring(text) (
sfunc = sumstring_sfunc,
stype = text,
combinefunc = sumstring_sfunc,
initcond = '0'
);
select sumstring(valf::text order by id)
from aggdata where valf is not null;
ERROR: function "aggregate_support.sumstring(text)" does not exist
CONTEXT: while executing command on localhost:57637
select create_distributed_function('sumstring(text)');
create_distributed_function
-----------------------------
(1 row)
select sumstring(valf::text order by id)
from aggdata where valf is not null;
sumstring
-----------
1339.59
(1 row)
-- test aggregate with stype that has an expanded read-write form
CREATE FUNCTION array_sort (int[])
RETURNS int[] LANGUAGE SQL AS $$
SELECT ARRAY(SELECT unnest($1) ORDER BY 1)
$$;
create aggregate array_collect_sort(el int) (
sfunc = array_append,
stype = int[],
combinefunc = array_cat,
finalfunc = array_sort,
initcond = '{}'
);
select create_distributed_function('array_collect_sort(int)');
create_distributed_function
-----------------------------
(1 row)
select array_collect_sort(val) from aggdata;
array_collect_sort
-------------------------------------
{0,2,2,3,4,5,8,NULL,NULL,NULL,NULL}
(1 row)
set client_min_messages to error;
drop schema aggregate_support cascade;

View File

@ -391,7 +391,7 @@ SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests')
WHERE proname = 'add';
$$);
run_command_on_workers
@ -404,7 +404,7 @@ SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests')
WHERE proname = 'sum2';
$$);
run_command_on_workers

View File

@ -70,7 +70,7 @@ test: multi_subquery_in_where_reference_clause full_join adaptive_executor propa
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql
test: multi_reference_table multi_select_for_update relation_access_tracking
test: custom_aggregate_support
test: custom_aggregate_support aggregate_support
test: multi_average_expression multi_working_columns multi_having_pushdown
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having ch_bench_subquery_repartition

View File

@ -0,0 +1,129 @@
--
-- AGGREGATE SUPPORT
--
-- Tests support for user defined aggregates
create schema aggregate_support;
set search_path to aggregate_support;
-- We test with & without STRICT as our code is responsible for managing these NULL checks
create function sum2_sfunc_strict(state int, x int)
returns int immutable strict language plpgsql as $$
begin return state + x;
end;
$$;
create function sum2_finalfunc_strict(state int)
returns int immutable strict language plpgsql as $$
begin return state * 2;
end;
$$;
create function sum2_sfunc(state int, x int)
returns int immutable language plpgsql as $$
begin return state + x;
end;
$$;
create function sum2_finalfunc(state int)
returns int immutable language plpgsql as $$
begin return state * 2;
end;
$$;
create aggregate sum2 (int) (
sfunc = sum2_sfunc,
stype = int,
finalfunc = sum2_finalfunc,
combinefunc = sum2_sfunc,
initcond = '0'
);
create aggregate sum2_strict (int) (
sfunc = sum2_sfunc_strict,
stype = int,
finalfunc = sum2_finalfunc_strict,
combinefunc = sum2_sfunc_strict
);
select create_distributed_function('sum2(int)');
select create_distributed_function('sum2_strict(int)');
create table aggdata (id int, key int, val int, valf float8);
select create_distributed_table('aggdata', 'id');
insert into aggdata (id, key, val, valf) values (1, 1, 2, 11.2), (2, 1, NULL, 2.1), (3, 2, 2, 3.22), (4, 2, 3, 4.23), (5, 2, 5, 5.25), (6, 3, 4, 63.4), (7, 5, NULL, 75), (8, 6, NULL, NULL), (9, 6, NULL, 96), (10, 7, 8, 1078), (11, 9, 0, 1.19);
select key, sum2(val), sum2_strict(val), stddev(valf)
from aggdata group by key order by key;
-- test polymorphic aggregates from https://github.com/citusdata/citus/issues/2397
-- we do not currently support pseudotypes for transition types, so this errors for now
CREATE OR REPLACE FUNCTION first_agg(anyelement, anyelement)
RETURNS anyelement AS $$
SELECT CASE WHEN $1 IS NULL THEN $2 ELSE $1 END;
$$ LANGUAGE SQL STABLE;
CREATE AGGREGATE first (
sfunc = first_agg,
basetype = anyelement,
stype = anyelement,
combinefunc = first_agg
);
CREATE OR REPLACE FUNCTION last_agg(anyelement, anyelement)
RETURNS anyelement AS $$
SELECT $2;
$$ LANGUAGE SQL STABLE;
CREATE AGGREGATE last (
sfunc = last_agg,
basetype = anyelement,
stype = anyelement,
combinefunc = last_agg
);
SELECT create_distributed_function('first(anyelement)');
SELECT create_distributed_function('last(anyelement)');
SELECT key, first(val ORDER BY id), last(val ORDER BY id)
FROM aggdata GROUP BY key ORDER BY key;
-- test aggregate with stype which is not a by-value datum
-- also test our handling of the aggregate not existing on workers
create function sumstring_sfunc(state text, x text)
returns text immutable language plpgsql as $$
begin return (state::float8 + x::float8)::text;
end;
$$;
create aggregate sumstring(text) (
sfunc = sumstring_sfunc,
stype = text,
combinefunc = sumstring_sfunc,
initcond = '0'
);
select sumstring(valf::text order by id)
from aggdata where valf is not null;
select create_distributed_function('sumstring(text)');
select sumstring(valf::text order by id)
from aggdata where valf is not null;
-- test aggregate with stype that has an expanded read-write form
CREATE FUNCTION array_sort (int[])
RETURNS int[] LANGUAGE SQL AS $$
SELECT ARRAY(SELECT unnest($1) ORDER BY 1)
$$;
create aggregate array_collect_sort(el int) (
sfunc = array_append,
stype = int[],
combinefunc = array_cat,
finalfunc = array_sort,
initcond = '{}'
);
select create_distributed_function('array_collect_sort(int)');
select array_collect_sort(val) from aggdata;
set client_min_messages to error;
drop schema aggregate_support cascade;

View File

@ -251,14 +251,14 @@ SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests')
WHERE proname = 'add';
$$);
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests')
WHERE proname = 'sum2';
$$);