Fix compile issues

fix_120_custom_aggregates_distribute_multiarg
Philip Dubé 2019-10-15 19:00:35 +00:00
parent 1545cb312c
commit 070ffc785f
4 changed files with 152 additions and 147 deletions

View File

@ -87,7 +87,7 @@ static char * quote_qualified_func_name(Oid funcOid);
PG_FUNCTION_INFO_V1(create_distributed_function); PG_FUNCTION_INFO_V1(create_distributed_function);
PG_FUNCTION_INFO_v1(mark_aggregate_for_distributed_execution); PG_FUNCTION_INFO_V1(mark_aggregate_for_distributed_execution);
#define AssertIsFunctionOrProcedure(objtype) \ #define AssertIsFunctionOrProcedure(objtype) \
Assert((objtype) == OBJECT_FUNCTION || (objtype) == OBJECT_PROCEDURE || (objtype) == \ Assert((objtype) == OBJECT_FUNCTION || (objtype) == OBJECT_PROCEDURE || (objtype) == \
@ -234,11 +234,10 @@ mark_aggregate_for_distributed_execution(PG_FUNCTION_ARGS)
Form_pg_aggregate agg = NULL; Form_pg_aggregate agg = NULL;
HeapTuple proctup = SearchSysCache1(PROCOID, funcOid); HeapTuple proctup = SearchSysCache1(PROCOID, funcOid);
HeapTuple aggtup = NULL; HeapTuple aggtup = NULL;
Oid helperOid = InvalidOid;
int numargs = 0; int numargs = 0;
Oid *argtypes = NULL; Oid *argtypes = NULL;
char **argnames = NULL;
char *argmodes = NULL;
FuncCandidateList clist;
if (!HeapTupleIsValid(proctup)) if (!HeapTupleIsValid(proctup))
{ {
@ -259,110 +258,15 @@ mark_aggregate_for_distributed_execution(PG_FUNCTION_ARGS)
agg = (Form_pg_aggregate) GETSTRUCT(aggtup); agg = (Form_pg_aggregate) GETSTRUCT(aggtup);
initStringInfo(&helperSuffix); initStringInfo(&helperSuffix);
appendStringInfoAggregateHelperSuffix(&helperSuffix, proc, agg);
switch (proc->proparallel)
{
case PROPARALLEL_SAFE:
{
appendStringInfoString(&helperSuffix, "_ps");
break;
}
case PROPARALLEL_RESTRICTED:
{
appendStringInfoString(&helperSuffix, "_pr");
break;
}
case PROPARALLEL_UNSAFE:
{
appendStringInfoString(&helperSuffix, "_pu");
break;
}
}
if (agg->aggfinalfn != InvalidOid)
{
switch (agg->aggfinalmodify)
{
case AGGMODIFY_READ_ONLY:
{
appendStringInfoString(&helperSuffix, "_ro");
break;
}
case AGGMODIFY_SHAREABLE:
{
appendStringInfoString(&helperSuffix, "_rs");
break;
}
case AGGMODIFY_READ_WRITE:
{
appendStringInfoString(&helperSuffix, "_rw");
break;
}
}
if (agg->aggfinalextra)
{
appendStringInfoString(&helperSuffix, "_fx");
}
}
if (agg->aggmfinalfn != InvalidOid)
{
appendStringInfoString(&helperSuffix, "_m");
switch (agg->aggmfinalmodify)
{
case AGGMODIFY_READ_ONLY:
{
appendStringInfoString(&helperSuffix, "_ro");
break;
}
case AGGMODIFY_SHAREABLE:
{
appendStringInfoString(&helperSuffix, "_rs");
break;
}
case AGGMODIFY_READ_WRITE:
{
appendStringInfoString(&helperSuffix, "_rw");
break;
}
}
if (agg->aggmfinalextra)
{
appendStringInfoString(&helperSuffix, "_fx");
}
}
/* Parameters, borrows heavily from print_function_arguments in postgres */
/* coordinator_combine_agg */ /* coordinator_combine_agg */
initStringInfo(&helperName); initStringInfo(&helperName);
appendStringInfo(&helperName, "%s%s", COORD_COMBINE_AGGREGATE_NAME, appendStringInfo(&helperName, "%s%s", COORD_COMBINE_AGGREGATE_NAME,
helperSuffix.data); helperSuffix.data);
helperOid = AggregateHelperOid(helperName.data, proctup, &numargs, &argtypes);
numargs = get_func_arg_info(proctup, &argtypes, &argnames, &argmodes); if (helperOid == InvalidOid)
clist = FuncnameGetCandidates(list_make2(makeString("citus"), makeString(
helperName.data)), proc->pronargs + 1,
NIL, false, false, true);
for (; clist; clist = clist->next)
{
if (clist->args[0] == OIDOID && memcmp(clist->args + 1, argtypes, numargs *
sizeof(numargs)) == 0)
{
break;
}
}
if (clist == NULL)
{ {
CreateAggregateHelper(helperName.data, COORD_COMBINE_AGGREGATE_NAME, proc, agg, CreateAggregateHelper(helperName.data, COORD_COMBINE_AGGREGATE_NAME, proc, agg,
numargs, argtypes); numargs, argtypes);
@ -372,22 +276,9 @@ mark_aggregate_for_distributed_execution(PG_FUNCTION_ARGS)
resetStringInfo(&helperName); resetStringInfo(&helperName);
appendStringInfo(&helperName, "%s%s", WORKER_PARTIAL_AGGREGATE_NAME, appendStringInfo(&helperName, "%s%s", WORKER_PARTIAL_AGGREGATE_NAME,
helperSuffix.data); helperSuffix.data);
helperOid = AggregateHelperOid(helperName.data, proctup, &numargs, &argtypes);
numargs = get_func_arg_info(proctup, &argtypes, &argnames, &argmodes); if (helperOid == InvalidOid)
clist = FuncnameGetCandidates(list_make2(makeString("citus"), makeString(
helperName.data)), proc->pronargs + 1,
NIL, false, false, true);
for (; clist; clist = clist->next)
{
if (clist->args[0] == OIDOID && memcmp(clist->args + 1, argtypes, numargs *
sizeof(numargs)) == 0)
{
break;
}
}
if (clist == NULL)
{ {
CreateAggregateHelper(helperName.data, WORKER_PARTIAL_AGGREGATE_NAME, proc, agg, CreateAggregateHelper(helperName.data, WORKER_PARTIAL_AGGREGATE_NAME, proc, agg,
numargs, argtypes); numargs, argtypes);
@ -410,6 +301,123 @@ early_exit:
} }
/*
* AggregateHelperOid returns helper aggregate oid for given proc's HeapTuple
*/
Oid
AggregateHelperOid(char *helperName, HeapTuple proctup, int *numargs, Oid **argtypes)
{
char **argnames = NULL;
char *argmodes = NULL;
FuncCandidateList clist;
*numargs = get_func_arg_info(proctup, argtypes, &argnames, &argmodes);
clist = FuncnameGetCandidates(list_make2(makeString("citus"), makeString(
helperName)), *numargs + 1,
NIL, false, false, true);
for (; clist; clist = clist->next)
{
if (clist->args[0] == OIDOID && memcmp(clist->args + 1, argtypes, *numargs *
sizeof(Oid)) == 0)
{
return clist->oid;
}
}
return InvalidOid;
}
/*
* AggregateHelperName returns helper function name for a given aggregate.
*/
void appendStringInfoAggregateHelperSuffix(StringInfo helperSuffix, Form_pg_proc proc, Form_pg_aggregate agg)
{
switch (proc->proparallel)
{
case PROPARALLEL_SAFE:
{
appendStringInfoString(helperSuffix, "_ps");
break;
}
case PROPARALLEL_RESTRICTED:
{
appendStringInfoString(helperSuffix, "_pr");
break;
}
case PROPARALLEL_UNSAFE:
{
appendStringInfoString(helperSuffix, "_pu");
break;
}
}
if (agg->aggfinalfn != InvalidOid)
{
switch (agg->aggfinalmodify)
{
case AGGMODIFY_READ_ONLY:
{
appendStringInfoString(helperSuffix, "_ro");
break;
}
case AGGMODIFY_SHAREABLE:
{
appendStringInfoString(helperSuffix, "_rs");
break;
}
case AGGMODIFY_READ_WRITE:
{
appendStringInfoString(helperSuffix, "_rw");
break;
}
}
if (agg->aggfinalextra)
{
appendStringInfoString(helperSuffix, "_fx");
}
}
if (agg->aggmfinalfn != InvalidOid)
{
appendStringInfoString(helperSuffix, "_m");
switch (agg->aggmfinalmodify)
{
case AGGMODIFY_READ_ONLY:
{
appendStringInfoString(helperSuffix, "_ro");
break;
}
case AGGMODIFY_SHAREABLE:
{
appendStringInfoString(helperSuffix, "_rs");
break;
}
case AGGMODIFY_READ_WRITE:
{
appendStringInfoString(helperSuffix, "_rw");
break;
}
}
if (agg->aggmfinalextra)
{
appendStringInfoString(helperSuffix, "_fx");
}
}
}
/* /*
* CreateAggregateHelper creates helper aggregates across nodes * CreateAggregateHelper creates helper aggregates across nodes
*/ */

View File

@ -28,6 +28,7 @@
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/extended_op_node_utils.h" #include "distributed/extended_op_node_utils.h"
#include "distributed/function_utils.h" #include "distributed/function_utils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -3140,43 +3141,33 @@ AggregateFunctionOid(const char *functionName, Oid inputType)
static Oid static Oid
AggregateFunctionHelperOid(const char *helperPrefix, Oid aggOid) AggregateFunctionHelperOid(const char *helperPrefix, Oid aggOid)
{ {
Oid functionOid = InvalidOid; StringInfoData helperName;
Relation procRelation = NULL; int numargs;
SysScanDesc scanDescriptor = NULL; Oid *argtypes;
ScanKeyData scanKey[1]; HeapTuple proctup;
int scanKeyCount = 1; HeapTuple aggtup;
HeapTuple heapTuple = NULL; Form_pg_proc proc;
Form_pg_aggregate agg;
procRelation = heap_open(ProcedureRelationId, AccessShareLock); proctup = SearchSysCache1(PROCOID, aggOid);
if (!HeapTupleIsValid(proctup))
ScanKeyInit(&scanKey[0], Anum_pg_proc_proname,
BTEqualStrategyNumber, F_NAMEEQ, CStringGetDatum(functionName));
scanDescriptor = systable_beginscan(procRelation,
ProcedureNameArgsNspIndexId, true,
NULL, scanKeyCount, scanKey);
/* loop until we find the right function */
heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{ {
#if PG_VERSION_NUM < 120000 return InvalidOid;
functionOid = HeapTupleGetOid(heapTuple);
#else
Form_pg_proc procForm = (Form_pg_proc) GETSTRUCT(heapTuple);
functionOid = procForm->oid;
#endif
} }
proc = (Form_pg_proc) GETSTRUCT(proctup);
if (functionOid == InvalidOid) aggtup = SearchSysCache1(AGGFNOID, aggOid);
if (!HeapTupleIsValid(aggtup))
{ {
ereport(ERROR, (errmsg("no matching oid for function: %s", functionName))); return InvalidOid;
} }
agg = (Form_pg_aggregate) GETSTRUCT(aggtup);
systable_endscan(scanDescriptor); initStringInfo(&helperName);
heap_close(procRelation, AccessShareLock); appendStringInfoString(&helperName, helperPrefix);
appendStringInfoAggregateHelperSuffix(&helperName, proc, agg);
return functionOid; return AggregateHelperOid(helperName.data, proctup, &numargs, &argtypes);
} }

View File

@ -15,6 +15,8 @@
#include "postgres.h" #include "postgres.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "tcop/dest.h" #include "tcop/dest.h"
@ -48,6 +50,10 @@ extern bool ConstraintIsAForeignKey(char *constraintName, Oid relationId);
/* function.c - forward declarations */ /* function.c - forward declarations */
extern void appendStringInfoAggregateHelperSuffix(StringInfo helperSuffix,
Form_pg_proc proc,
Form_pg_aggregate agg);
extern Oid AggregateHelperOid(char *helperName, HeapTuple proctup, int *numargs, Oid **argtypes);
extern List * PlanCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString); extern List * PlanCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString);
extern List * ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const extern List * ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const
char *queryString); char *queryString);

View File

@ -50,7 +50,7 @@ typedef FormData_pg_dist_object *Form_pg_dist_object;
* compiler constants for pg_dist_object * compiler constants for pg_dist_object
* ---------------- * ----------------
*/ */
#define Natts_pg_dist_object 8 #define Natts_pg_dist_object 9
#define Anum_pg_dist_object_classid 1 #define Anum_pg_dist_object_classid 1
#define Anum_pg_dist_object_objid 2 #define Anum_pg_dist_object_objid 2
#define Anum_pg_dist_object_objsubid 3 #define Anum_pg_dist_object_objsubid 3