From 070ffc785f390805b0d305d41617b4cf44750a74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 15 Oct 2019 19:00:35 +0000 Subject: [PATCH] Fix compile issues --- src/backend/distributed/commands/function.c | 242 +++++++++--------- .../planner/multi_logical_optimizer.c | 49 ++-- src/include/distributed/commands.h | 6 + .../distributed/metadata/pg_dist_object.h | 2 +- 4 files changed, 152 insertions(+), 147 deletions(-) diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index d2e7bb09a..b5c00fd9a 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -87,7 +87,7 @@ static char * quote_qualified_func_name(Oid funcOid); PG_FUNCTION_INFO_V1(create_distributed_function); -PG_FUNCTION_INFO_v1(mark_aggregate_for_distributed_execution); +PG_FUNCTION_INFO_V1(mark_aggregate_for_distributed_execution); #define AssertIsFunctionOrProcedure(objtype) \ Assert((objtype) == OBJECT_FUNCTION || (objtype) == OBJECT_PROCEDURE || (objtype) == \ @@ -234,11 +234,10 @@ mark_aggregate_for_distributed_execution(PG_FUNCTION_ARGS) Form_pg_aggregate agg = NULL; HeapTuple proctup = SearchSysCache1(PROCOID, funcOid); HeapTuple aggtup = NULL; + Oid helperOid = InvalidOid; int numargs = 0; Oid *argtypes = NULL; - char **argnames = NULL; - char *argmodes = NULL; - FuncCandidateList clist; + if (!HeapTupleIsValid(proctup)) { @@ -259,110 +258,15 @@ mark_aggregate_for_distributed_execution(PG_FUNCTION_ARGS) agg = (Form_pg_aggregate) GETSTRUCT(aggtup); initStringInfo(&helperSuffix); - - switch (proc->proparallel) - { - case PROPARALLEL_SAFE: - { - appendStringInfoString(&helperSuffix, "_ps"); - break; - } - - case PROPARALLEL_RESTRICTED: - { - appendStringInfoString(&helperSuffix, "_pr"); - break; - } - - case PROPARALLEL_UNSAFE: - { - appendStringInfoString(&helperSuffix, "_pu"); - break; - } - } - - if (agg->aggfinalfn != InvalidOid) - { - switch (agg->aggfinalmodify) - { - case AGGMODIFY_READ_ONLY: - { - appendStringInfoString(&helperSuffix, "_ro"); - break; - } - - case AGGMODIFY_SHAREABLE: - { - appendStringInfoString(&helperSuffix, "_rs"); - break; - } - - case AGGMODIFY_READ_WRITE: - { - appendStringInfoString(&helperSuffix, "_rw"); - break; - } - } - - if (agg->aggfinalextra) - { - appendStringInfoString(&helperSuffix, "_fx"); - } - } - - if (agg->aggmfinalfn != InvalidOid) - { - appendStringInfoString(&helperSuffix, "_m"); - - switch (agg->aggmfinalmodify) - { - case AGGMODIFY_READ_ONLY: - { - appendStringInfoString(&helperSuffix, "_ro"); - break; - } - - case AGGMODIFY_SHAREABLE: - { - appendStringInfoString(&helperSuffix, "_rs"); - break; - } - - case AGGMODIFY_READ_WRITE: - { - appendStringInfoString(&helperSuffix, "_rw"); - break; - } - } - - if (agg->aggmfinalextra) - { - appendStringInfoString(&helperSuffix, "_fx"); - } - } - - /* Parameters, borrows heavily from print_function_arguments in postgres */ + appendStringInfoAggregateHelperSuffix(&helperSuffix, proc, agg); /* coordinator_combine_agg */ initStringInfo(&helperName); appendStringInfo(&helperName, "%s%s", COORD_COMBINE_AGGREGATE_NAME, helperSuffix.data); + helperOid = AggregateHelperOid(helperName.data, proctup, &numargs, &argtypes); - numargs = get_func_arg_info(proctup, &argtypes, &argnames, &argmodes); - clist = FuncnameGetCandidates(list_make2(makeString("citus"), makeString( - helperName.data)), proc->pronargs + 1, - NIL, false, false, true); - - for (; clist; clist = clist->next) - { - if (clist->args[0] == OIDOID && memcmp(clist->args + 1, argtypes, numargs * - sizeof(numargs)) == 0) - { - break; - } - } - - if (clist == NULL) + if (helperOid == InvalidOid) { CreateAggregateHelper(helperName.data, COORD_COMBINE_AGGREGATE_NAME, proc, agg, numargs, argtypes); @@ -372,22 +276,9 @@ mark_aggregate_for_distributed_execution(PG_FUNCTION_ARGS) resetStringInfo(&helperName); appendStringInfo(&helperName, "%s%s", WORKER_PARTIAL_AGGREGATE_NAME, helperSuffix.data); + helperOid = AggregateHelperOid(helperName.data, proctup, &numargs, &argtypes); - numargs = get_func_arg_info(proctup, &argtypes, &argnames, &argmodes); - clist = FuncnameGetCandidates(list_make2(makeString("citus"), makeString( - helperName.data)), proc->pronargs + 1, - NIL, false, false, true); - - for (; clist; clist = clist->next) - { - if (clist->args[0] == OIDOID && memcmp(clist->args + 1, argtypes, numargs * - sizeof(numargs)) == 0) - { - break; - } - } - - if (clist == NULL) + if (helperOid == InvalidOid) { CreateAggregateHelper(helperName.data, WORKER_PARTIAL_AGGREGATE_NAME, proc, agg, numargs, argtypes); @@ -410,6 +301,123 @@ early_exit: } +/* + * AggregateHelperOid returns helper aggregate oid for given proc's HeapTuple + */ +Oid +AggregateHelperOid(char *helperName, HeapTuple proctup, int *numargs, Oid **argtypes) +{ + char **argnames = NULL; + char *argmodes = NULL; + FuncCandidateList clist; + + *numargs = get_func_arg_info(proctup, argtypes, &argnames, &argmodes); + clist = FuncnameGetCandidates(list_make2(makeString("citus"), makeString( + helperName)), *numargs + 1, + NIL, false, false, true); + + for (; clist; clist = clist->next) + { + if (clist->args[0] == OIDOID && memcmp(clist->args + 1, argtypes, *numargs * + sizeof(Oid)) == 0) + { + return clist->oid; + } + } + + return InvalidOid; +} + + +/* + * AggregateHelperName returns helper function name for a given aggregate. + */ +void appendStringInfoAggregateHelperSuffix(StringInfo helperSuffix, Form_pg_proc proc, Form_pg_aggregate agg) +{ + switch (proc->proparallel) + { + case PROPARALLEL_SAFE: + { + appendStringInfoString(helperSuffix, "_ps"); + break; + } + + case PROPARALLEL_RESTRICTED: + { + appendStringInfoString(helperSuffix, "_pr"); + break; + } + + case PROPARALLEL_UNSAFE: + { + appendStringInfoString(helperSuffix, "_pu"); + break; + } + } + + if (agg->aggfinalfn != InvalidOid) + { + switch (agg->aggfinalmodify) + { + case AGGMODIFY_READ_ONLY: + { + appendStringInfoString(helperSuffix, "_ro"); + break; + } + + case AGGMODIFY_SHAREABLE: + { + appendStringInfoString(helperSuffix, "_rs"); + break; + } + + case AGGMODIFY_READ_WRITE: + { + appendStringInfoString(helperSuffix, "_rw"); + break; + } + } + + if (agg->aggfinalextra) + { + appendStringInfoString(helperSuffix, "_fx"); + } + } + + if (agg->aggmfinalfn != InvalidOid) + { + appendStringInfoString(helperSuffix, "_m"); + + switch (agg->aggmfinalmodify) + { + case AGGMODIFY_READ_ONLY: + { + appendStringInfoString(helperSuffix, "_ro"); + break; + } + + case AGGMODIFY_SHAREABLE: + { + appendStringInfoString(helperSuffix, "_rs"); + break; + } + + case AGGMODIFY_READ_WRITE: + { + appendStringInfoString(helperSuffix, "_rw"); + break; + } + } + + if (agg->aggmfinalextra) + { + appendStringInfoString(helperSuffix, "_fx"); + } + } + +} + + /* * CreateAggregateHelper creates helper aggregates across nodes */ diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 865280a1a..28347733a 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -28,6 +28,7 @@ #include "distributed/citus_nodes.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" +#include "distributed/commands.h" #include "distributed/extended_op_node_utils.h" #include "distributed/function_utils.h" #include "distributed/metadata_cache.h" @@ -3140,43 +3141,33 @@ AggregateFunctionOid(const char *functionName, Oid inputType) static Oid AggregateFunctionHelperOid(const char *helperPrefix, Oid aggOid) { - Oid functionOid = InvalidOid; - Relation procRelation = NULL; - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - HeapTuple heapTuple = NULL; + StringInfoData helperName; + int numargs; + Oid *argtypes; + HeapTuple proctup; + HeapTuple aggtup; + Form_pg_proc proc; + Form_pg_aggregate agg; - procRelation = heap_open(ProcedureRelationId, AccessShareLock); - - ScanKeyInit(&scanKey[0], Anum_pg_proc_proname, - BTEqualStrategyNumber, F_NAMEEQ, CStringGetDatum(functionName)); - - scanDescriptor = systable_beginscan(procRelation, - ProcedureNameArgsNspIndexId, true, - NULL, scanKeyCount, scanKey); - - /* loop until we find the right function */ - heapTuple = systable_getnext(scanDescriptor); - if (HeapTupleIsValid(heapTuple)) + proctup = SearchSysCache1(PROCOID, aggOid); + if (!HeapTupleIsValid(proctup)) { -#if PG_VERSION_NUM < 120000 - functionOid = HeapTupleGetOid(heapTuple); -#else - Form_pg_proc procForm = (Form_pg_proc) GETSTRUCT(heapTuple); - functionOid = procForm->oid; -#endif + return InvalidOid; } + proc = (Form_pg_proc) GETSTRUCT(proctup); - if (functionOid == InvalidOid) + aggtup = SearchSysCache1(AGGFNOID, aggOid); + if (!HeapTupleIsValid(aggtup)) { - ereport(ERROR, (errmsg("no matching oid for function: %s", functionName))); + return InvalidOid; } + agg = (Form_pg_aggregate) GETSTRUCT(aggtup); - systable_endscan(scanDescriptor); - heap_close(procRelation, AccessShareLock); + initStringInfo(&helperName); + appendStringInfoString(&helperName, helperPrefix); + appendStringInfoAggregateHelperSuffix(&helperName, proc, agg); - return functionOid; + return AggregateHelperOid(helperName.data, proctup, &numargs, &argtypes); } diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 9395ca315..52de313b1 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -15,6 +15,8 @@ #include "postgres.h" +#include "catalog/pg_aggregate.h" +#include "catalog/pg_proc.h" #include "utils/rel.h" #include "nodes/parsenodes.h" #include "tcop/dest.h" @@ -48,6 +50,10 @@ extern bool ConstraintIsAForeignKey(char *constraintName, Oid relationId); /* function.c - forward declarations */ +extern void appendStringInfoAggregateHelperSuffix(StringInfo helperSuffix, + Form_pg_proc proc, + Form_pg_aggregate agg); +extern Oid AggregateHelperOid(char *helperName, HeapTuple proctup, int *numargs, Oid **argtypes); extern List * PlanCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString); extern List * ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString); diff --git a/src/include/distributed/metadata/pg_dist_object.h b/src/include/distributed/metadata/pg_dist_object.h index 4d41d4d01..6926c8316 100644 --- a/src/include/distributed/metadata/pg_dist_object.h +++ b/src/include/distributed/metadata/pg_dist_object.h @@ -50,7 +50,7 @@ typedef FormData_pg_dist_object *Form_pg_dist_object; * compiler constants for pg_dist_object * ---------------- */ -#define Natts_pg_dist_object 8 +#define Natts_pg_dist_object 9 #define Anum_pg_dist_object_classid 1 #define Anum_pg_dist_object_objid 2 #define Anum_pg_dist_object_objsubid 3