From c32bd459f45464b92180ca9d687bb68860ba4cff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 27 Sep 2019 18:26:07 +0000 Subject: [PATCH] create_distributed_function: accept aggregates --- .../distributed/commands/dependencies.c | 36 +- src/backend/distributed/commands/function.c | 447 +++++++++++++++++- src/backend/distributed/commands/schema.c | 2 + .../distributed/commands/utility_hook.c | 3 + src/backend/distributed/deparser/deparse.c | 5 + .../deparser/deparse_function_stmts.c | 47 +- .../distributed/deparser/objectaddress.c | 3 + .../deparser/qualify_function_stmt.c | 33 +- src/backend/distributed/utils/ruleutils_11.c | 3 +- src/backend/distributed/utils/ruleutils_12.c | 3 +- src/include/distributed/citus_ruleutils.h | 1 + src/include/distributed/deparser.h | 2 + .../expected/distributed_functions.out | 103 +++- .../regress/sql/distributed_functions.sql | 88 +++- 14 files changed, 686 insertions(+), 90 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 40558fe8e..7bda42c35 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -144,24 +144,6 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency) { switch (getObjectClass(dependency)) { - case OCLASS_SCHEMA: - { - const char *schemaDDLCommand = CreateSchemaDDLCommand(dependency->objectId); - - if (schemaDDLCommand == NULL) - { - /* no schema to create */ - return NIL; - } - - return list_make1((void *) schemaDDLCommand); - } - - case OCLASS_TYPE: - { - return CreateTypeDDLCommandsIdempotent(dependency); - } - case OCLASS_CLASS: { /* @@ -182,6 +164,24 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency) return CreateFunctionDDLCommandsIdempotent(dependency); } + case OCLASS_SCHEMA: + { + const char *schemaDDLCommand = CreateSchemaDDLCommand(dependency->objectId); + + if (schemaDDLCommand == NULL) + { + /* no schema to create */ + return NIL; + } + + return list_make1((void *) schemaDDLCommand); + } + + case OCLASS_TYPE: + { + return CreateTypeDDLCommandsIdempotent(dependency); + } + default: { break; diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index a02fd2e5d..7fa02d9f7 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -24,10 +24,12 @@ #endif #include "access/htup_details.h" #include "access/xact.h" +#include "catalog/pg_aggregate.h" #include "catalog/namespace.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "commands/extension.h" +#include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" @@ -54,6 +56,7 @@ (strncmp(arg, prefix, strlen(prefix)) == 0) /* forward declaration for helper functions*/ +static char * GetAggregateDDLCommand(const RegProcedure funcOid); static char * GetFunctionDDLCommand(const RegProcedure funcOid); static char * GetFunctionAlterOwnerCommand(const RegProcedure funcOid); static int GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName, @@ -75,12 +78,14 @@ static ObjectAddress * FunctionToObjectAddress(ObjectType objectType, bool missing_ok); static void ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt); static void ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress); +static char * quote_qualified_func_name(Oid funcOid); PG_FUNCTION_INFO_V1(create_distributed_function); #define AssertIsFunctionOrProcedure(objtype) \ - Assert((objtype) == OBJECT_FUNCTION || (objtype) == OBJECT_PROCEDURE) + Assert((objtype) == OBJECT_FUNCTION || (objtype) == OBJECT_PROCEDURE || (objtype) == \ + OBJECT_AGGREGATE) /* @@ -537,27 +542,33 @@ GetFunctionDDLCommand(const RegProcedure funcOid) StringInfo ddlCommand = makeStringInfo(); OverrideSearchPath *overridePath = NULL; - Datum sqlTextDatum = 0; char *createFunctionSQL = NULL; char *alterFunctionOwnerSQL = NULL; - /* - * Set search_path to NIL so that all objects outside of pg_catalog will be - * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; - */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + createFunctionSQL = GetAggregateDDLCommand(funcOid); - sqlTextDatum = DirectFunctionCall1(pg_get_functiondef, - ObjectIdGetDatum(funcOid)); + if (createFunctionSQL == NULL) + { + Datum sqlTextDatum = (Datum) 0; - /* revert back to original search_path */ - PopOverrideSearchPath(); + /* + * Set search_path to NIL so that all objects outside of pg_catalog will be + * schema-prefixed. pg_catalog will be added automatically when we call + * PushOverrideSearchPath(), since we set addCatalog to true; + */ + overridePath = GetOverrideSearchPath(CurrentMemoryContext); + overridePath->schemas = NIL; + overridePath->addCatalog = true; + + PushOverrideSearchPath(overridePath); + sqlTextDatum = DirectFunctionCall1(pg_get_functiondef, + ObjectIdGetDatum(funcOid)); + createFunctionSQL = TextDatumGetCString(sqlTextDatum); + + /* revert back to original search_path */ + PopOverrideSearchPath(); + } - createFunctionSQL = TextDatumGetCString(sqlTextDatum); alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid); appendStringInfo(ddlCommand, "%s;%s", createFunctionSQL, alterFunctionOwnerSQL); @@ -575,7 +586,7 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid) { HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcOid)); StringInfo alterCommand = makeStringInfo(); - bool isProcedure = false; + char *kindString = "FUNCTION"; Oid procOwner = InvalidOid; char *functionSignature = NULL; @@ -592,7 +603,14 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid) procOwner = procform->proowner; - isProcedure = procform->prokind == PROKIND_PROCEDURE; + if (procform->prokind == PROKIND_PROCEDURE) + { + kindString = "PROCEDURE"; + } + else if (procform->prokind == PROKIND_AGGREGATE) + { + kindString = "AGGREGATE"; + } ReleaseSysCache(proctup); } @@ -628,7 +646,7 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid) functionOwner = GetUserNameFromId(procOwner, false); appendStringInfo(alterCommand, "ALTER %s %s OWNER TO %s;", - (isProcedure ? "PROCEDURE" : "FUNCTION"), + kindString, functionSignature, quote_identifier(functionOwner)); @@ -636,6 +654,387 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid) } +/* + * GetAggregateDDLCommand returns a string for creating an aggregate. + * Returns NULL if funcOid is not an aggregate. + */ +static char * +GetAggregateDDLCommand(const RegProcedure funcOid) +{ + StringInfoData buf = { 0 }; +#if PG_VERSION_NUM < 120000 + + /* + * CREATE OR REPLACE AGGREGATE only introduced in pg12, + * so we hack around that with a DO block & exception handler. + * Requires some care to pick the correct $delimiter$ + */ + StringInfoData resultbuf = { 0 }; +#endif + HeapTuple proctup = NULL; + Form_pg_proc proc = NULL; + HeapTuple aggtup = NULL; + Form_pg_aggregate agg = NULL; + const char *name = NULL; + const char *nsp = NULL; + int numargs = 0; + int i = 0; + Oid *argtypes = NULL; + char **argnames = NULL; + char *argmodes = NULL; + int insertorderbyat = -1; + int argsprinted = 0; + int inputargno = 0; + + proctup = SearchSysCache1(PROCOID, funcOid); + if (!HeapTupleIsValid(proctup)) + { + goto early_exit; + } + + proc = (Form_pg_proc) GETSTRUCT(proctup); + + if (proc->prokind != PROKIND_AGGREGATE) + { + goto early_exit; + } + + initStringInfo(&buf); + + name = NameStr(proc->proname); + nsp = get_namespace_name(proc->pronamespace); + +#if PG_VERSION_NUM >= 120000 + appendStringInfo(&buf, "CREATE OR REPLACE AGGREGATE %s(", + quote_qualified_identifier(nsp, name)); +#else + appendStringInfo(&buf, "CREATE AGGREGATE %s(", + quote_qualified_identifier(nsp, name)); +#endif + + /* Parameters, orrows heavily from print_function_arguments in postgres */ + numargs = get_func_arg_info(proctup, &argtypes, &argnames, &argmodes); + + aggtup = SearchSysCache1(AGGFNOID, funcOid); + if (!HeapTupleIsValid(aggtup)) + { + goto early_exit; + } + agg = (Form_pg_aggregate) GETSTRUCT(aggtup); + + if (AGGKIND_IS_ORDERED_SET(agg->aggkind)) + { + insertorderbyat = agg->aggnumdirectargs; + } + + for (i = 0; i < numargs; i++) + { + Oid argtype = argtypes[i]; + char *argname = argnames ? argnames[i] : NULL; + char argmode = argmodes ? argmodes[i] : PROARGMODE_IN; + const char *modename; + + switch (argmode) + { + case PROARGMODE_IN: + { + modename = ""; + break; + } + + case PROARGMODE_VARIADIC: + { + modename = "VARIADIC "; + break; + } + + default: + { + elog(ERROR, "unexpected parameter mode '%c'", argmode); + modename = NULL; + break; + } + } + + inputargno++; /* this is a 1-based counter */ + if (argsprinted == insertorderbyat) + { + appendStringInfoString(&buf, " ORDER BY"); + } + else if (argsprinted) + { + appendStringInfoString(&buf, ", "); + } + + appendStringInfoString(&buf, modename); + + if (argname && argname[0]) + { + appendStringInfo(&buf, "%s ", quote_identifier(argname)); + } + + appendStringInfoString(&buf, format_type_be(argtype)); + + argsprinted++; + + /* nasty hack: print the last arg twice for variadic ordered-set agg */ + if (argsprinted == insertorderbyat && i == numargs - 1) + { + i--; + } + } + + appendStringInfoString(&buf, ") ("); + + appendStringInfo(&buf, "STYPE = %s,SFUNC = %s", + format_type_be_qualified(agg->aggtranstype), + quote_qualified_func_name(agg->aggtransfn)); + + if (agg->aggtransspace != 0) + { + appendStringInfo(&buf, ", SSPACE = %d", agg->aggtransspace); + } + + if (agg->aggfinalfn != InvalidOid) + { + const char *finalmodifystring = NULL; + switch (agg->aggfinalmodify) + { + case AGGMODIFY_READ_ONLY: + { + finalmodifystring = "READ_ONLY"; + break; + } + + case AGGMODIFY_SHAREABLE: + { + finalmodifystring = "SHAREABLE"; + break; + } + + case AGGMODIFY_READ_WRITE: + { + finalmodifystring = "READ_WRITE"; + break; + } + } + + appendStringInfo(&buf, ", FINALFUNC = %s", + quote_qualified_func_name(agg->aggfinalfn)); + + if (finalmodifystring != NULL) + { + appendStringInfo(&buf, ", FINALFUNC_MODIFY = %s", finalmodifystring); + } + + if (agg->aggfinalextra) + { + appendStringInfoString(&buf, ", FINALFUNC_EXTRA"); + } + } + + if (agg->aggmtransspace != 0) + { + appendStringInfo(&buf, ", MSSPACE = %d", agg->aggmtransspace); + } + + if (agg->aggmfinalfn) + { + const char *mfinalmodifystring = NULL; + switch (agg->aggfinalmodify) + { + case AGGMODIFY_READ_ONLY: + { + mfinalmodifystring = "READ_ONLY"; + break; + } + + case AGGMODIFY_SHAREABLE: + { + mfinalmodifystring = "SHAREABLE"; + break; + } + + case AGGMODIFY_READ_WRITE: + { + mfinalmodifystring = "READ_WRITE"; + break; + } + } + + appendStringInfo(&buf, ", MFINALFUNC = %s", + quote_qualified_func_name(agg->aggmfinalfn)); + + if (mfinalmodifystring != NULL) + { + appendStringInfo(&buf, ", MFINALFUNC_MODIFY = %s", mfinalmodifystring); + } + + if (agg->aggmfinalextra) + { + appendStringInfoString(&buf, ", MFINALFUNC_EXTRA"); + } + } + + if (agg->aggmtransfn) + { + appendStringInfo(&buf, ", MSFUNC = %s", + quote_qualified_func_name(agg->aggmtransfn)); + + if (agg->aggmtranstype) + { + appendStringInfo(&buf, ", MSTYPE = %s", + format_type_be_qualified(agg->aggmtranstype)); + } + } + + if (agg->aggtransspace != 0) + { + appendStringInfo(&buf, ", SSPACE = %d", agg->aggtransspace); + } + + if (agg->aggminvtransfn) + { + appendStringInfo(&buf, ", MINVFUNC = %s", + quote_qualified_func_name(agg->aggminvtransfn)); + } + + if (agg->aggcombinefn) + { + appendStringInfo(&buf, ", COMBINEFUNC = %s", + quote_qualified_func_name(agg->aggcombinefn)); + } + + if (agg->aggserialfn) + { + appendStringInfo(&buf, ", SERIALFUNC = %s", + quote_qualified_func_name(agg->aggserialfn)); + } + + if (agg->aggdeserialfn) + { + appendStringInfo(&buf, ", DESERIALFUNC = %s", + quote_qualified_func_name(agg->aggdeserialfn)); + } + + if (agg->aggsortop != InvalidOid) + { + appendStringInfo(&buf, ", SORTOP = %s", + generate_operator_name(agg->aggsortop, argtypes[0], + argtypes[0])); + } + + { + const char *parallelstring = NULL; + switch (proc->proparallel) + { + case PROPARALLEL_SAFE: + { + parallelstring = "SAFE"; + break; + } + + case PROPARALLEL_RESTRICTED: + { + parallelstring = "RESTRICTED"; + break; + } + + case PROPARALLEL_UNSAFE: + { + break; + } + + default: + elog(WARNING, "Unknown parallel option, ignoring: %c", proc->proparallel); + } + + if (parallelstring != NULL) + { + appendStringInfo(&buf, ", PARALLEL = %s", parallelstring); + } + } + + { + bool isNull = false; + Datum textInitVal = SysCacheGetAttr(AGGFNOID, aggtup, + Anum_pg_aggregate_agginitval, + &isNull); + if (!isNull) + { + char *strInitVal = TextDatumGetCString(textInitVal); + char *strInitValQuoted = quote_literal_cstr(strInitVal); + + appendStringInfo(&buf, ", INITCOND = %s", strInitValQuoted); + + pfree(strInitValQuoted); + pfree(strInitVal); + } + } + + { + bool isNull = false; + Datum textInitVal = SysCacheGetAttr(AGGFNOID, aggtup, + Anum_pg_aggregate_aggminitval, + &isNull); + if (!isNull) + { + char *strInitVal = TextDatumGetCString(textInitVal); + char *strInitValQuoted = quote_literal_cstr(strInitVal); + + appendStringInfo(&buf, ", MINITCOND = %s", strInitValQuoted); + + pfree(strInitValQuoted); + pfree(strInitVal); + } + } + + if (agg->aggkind == AGGKIND_HYPOTHETICAL) + { + appendStringInfoString(&buf, ", HYPOTHETICAL"); + } + + appendStringInfoChar(&buf, ')'); + +#if PG_VERSION_NUM < 120000 + { + /* construct a dollar quoted string, making sure the quoted text doesn't contain the dollar quote */ + StringInfoData delim; + initStringInfo(&delim); + initStringInfo(&resultbuf); + + appendStringInfoChar(&delim, '$'); + while (strstr(buf.data, delim.data) != NULL) + { + appendStringInfoChar(&delim, 'z'); + } + + appendStringInfo(&resultbuf, + "DO %s$BEGIN %s;EXCEPTION WHEN duplicate_function THEN NULL;END%s$", + delim.data, buf.data, delim.data); + + pfree(delim.data); + pfree(buf.data); + } +#endif + +early_exit: + if (aggtup && HeapTupleIsValid(aggtup)) + { + ReleaseSysCache(aggtup); + } + if (proctup && HeapTupleIsValid(proctup)) + { + ReleaseSysCache(proctup); + } +#if PG_VERSION_NUM >= 120000 + return buf.data; +#else + return resultbuf.data; +#endif +} + + /* * EnsureSequentialModeForFunctionDDL makes sure that the current transaction is already in * sequential mode, or can still safely be put in sequential mode, it errors if that is @@ -1410,3 +1809,13 @@ ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress) extensionName))); } } + + +/* returns the quoted qualified name of a given function oid */ +static char * +quote_qualified_func_name(Oid funcOid) +{ + return quote_qualified_identifier( + get_namespace_name(get_func_namespace(funcOid)), + get_func_name(funcOid)); +} diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index 90191a910..d6e19335b 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -122,6 +122,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString) } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { return PlanAlterFunctionSchemaStmt(stmt, queryString); @@ -197,6 +198,7 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryStrin } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { ProcessAlterFunctionSchemaStmt(stmt, queryString); diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index f53c8d1f6..41baa4e1d 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -426,6 +426,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { ddlJobs = PlanDropFunctionStmt(dropStatement, queryString); @@ -482,6 +483,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { ddlJobs = PlanRenameFunctionStmt(renameStmt, queryString); @@ -839,6 +841,7 @@ PlanAlterOwnerStmt(AlterOwnerStmt *stmt, const char *queryString) } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { return PlanAlterFunctionOwnerStmt(stmt, queryString); diff --git a/src/backend/distributed/deparser/deparse.c b/src/backend/distributed/deparser/deparse.c index d9b814c46..3666a8432 100644 --- a/src/backend/distributed/deparser/deparse.c +++ b/src/backend/distributed/deparser/deparse.c @@ -121,6 +121,7 @@ DeparseDropStmt(DropStmt *stmt) } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { return DeparseDropFunctionStmt(stmt); @@ -187,6 +188,7 @@ DeparseRenameStmt(RenameStmt *stmt) } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { return DeparseRenameFunctionStmt(stmt); @@ -240,6 +242,7 @@ DeparseAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt) } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { return DeparseAlterFunctionSchemaStmt(stmt); @@ -272,6 +275,7 @@ DeparseAlterOwnerStmt(AlterOwnerStmt *stmt) } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { return DeparseAlterFunctionOwnerStmt(stmt); @@ -299,6 +303,7 @@ DeparseAlterObjectDependsStmt(AlterObjectDependsStmt *stmt) switch (stmt->objectType) { case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { return DeparseAlterFunctionDependsStmt(stmt); diff --git a/src/backend/distributed/deparser/deparse_function_stmts.c b/src/backend/distributed/deparser/deparse_function_stmts.c index ccc114f11..a528ba993 100644 --- a/src/backend/distributed/deparser/deparse_function_stmts.c +++ b/src/backend/distributed/deparser/deparse_function_stmts.c @@ -90,14 +90,17 @@ AppendAlterFunctionStmt(StringInfo buf, AlterFunctionStmt *stmt) { appendStringInfo(buf, "ALTER FUNCTION "); } - else + else if (stmt->objtype == OBJECT_PROCEDURE) { appendStringInfo(buf, "ALTER PROCEDURE "); } + else + { + appendStringInfo(buf, "ALTER AGGREGATE "); + } AppendFunctionName(buf, stmt->func, stmt->objtype); - foreach(actionCell, stmt->actions) { DefElem *def = castNode(DefElem, lfirst(actionCell)); @@ -298,7 +301,7 @@ DeparseRenameFunctionStmt(RenameStmt *stmt) StringInfoData str = { 0 }; initStringInfo(&str); - Assert(stmt->renameType == OBJECT_FUNCTION || stmt->renameType == OBJECT_PROCEDURE); + AssertObjectTypeIsFunctional(stmt->renameType); AppendRenameFunctionStmt(&str, stmt); @@ -318,10 +321,14 @@ AppendRenameFunctionStmt(StringInfo buf, RenameStmt *stmt) { appendStringInfoString(buf, "ALTER FUNCTION "); } - else + else if (stmt->renameType == OBJECT_PROCEDURE) { appendStringInfoString(buf, "ALTER PROCEDURE "); } + else + { + appendStringInfoString(buf, "ALTER AGGREGATE "); + } AppendFunctionName(buf, func, stmt->renameType); @@ -338,7 +345,7 @@ DeparseAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt) StringInfoData str = { 0 }; initStringInfo(&str); - Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); + AssertObjectTypeIsFunctional(stmt->objectType); AppendAlterFunctionSchemaStmt(&str, stmt); @@ -358,10 +365,14 @@ AppendAlterFunctionSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt) { appendStringInfoString(buf, "ALTER FUNCTION "); } - else + else if (stmt->objectType == OBJECT_PROCEDURE) { appendStringInfoString(buf, "ALTER PROCEDURE "); } + else + { + appendStringInfoString(buf, "ALTER AGGREGATE "); + } AppendFunctionName(buf, func, stmt->objectType); appendStringInfo(buf, " SET SCHEMA %s;", quote_identifier(stmt->newschema)); @@ -377,7 +388,7 @@ DeparseAlterFunctionOwnerStmt(AlterOwnerStmt *stmt) StringInfoData str = { 0 }; initStringInfo(&str); - Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); + AssertObjectTypeIsFunctional(stmt->objectType); AppendAlterFunctionOwnerStmt(&str, stmt); @@ -397,10 +408,14 @@ AppendAlterFunctionOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt) { appendStringInfoString(buf, "ALTER FUNCTION "); } - else + else if (stmt->objectType == OBJECT_PROCEDURE) { appendStringInfoString(buf, "ALTER PROCEDURE "); } + else + { + appendStringInfoString(buf, "ALTER AGGREGATE "); + } AppendFunctionName(buf, func, stmt->objectType); appendStringInfo(buf, " OWNER TO %s;", RoleSpecString(stmt->newowner)); @@ -416,7 +431,7 @@ DeparseAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt) StringInfoData str = { 0 }; initStringInfo(&str); - Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); + AssertObjectTypeIsFunctional(stmt->objectType); AppendAlterFunctionDependsStmt(&str, stmt); @@ -436,10 +451,14 @@ AppendAlterFunctionDependsStmt(StringInfo buf, AlterObjectDependsStmt *stmt) { appendStringInfoString(buf, "ALTER FUNCTION "); } - else + else if (stmt->objectType == OBJECT_PROCEDURE) { appendStringInfoString(buf, "ALTER PROCEDURE "); } + else + { + appendStringInfoString(buf, "ALTER AGGREGATE "); + } AppendFunctionName(buf, func, stmt->objectType); appendStringInfo(buf, " DEPENDS ON EXTENSION %s;", strVal(stmt->extname)); @@ -455,7 +474,7 @@ DeparseDropFunctionStmt(DropStmt *stmt) StringInfoData str = { 0 }; initStringInfo(&str); - Assert(stmt->removeType == OBJECT_FUNCTION || stmt->removeType == OBJECT_PROCEDURE); + AssertObjectTypeIsFunctional(stmt->removeType); AppendDropFunctionStmt(&str, stmt); @@ -473,10 +492,14 @@ AppendDropFunctionStmt(StringInfo buf, DropStmt *stmt) { appendStringInfoString(buf, "DROP FUNCTION "); } - else + else if (stmt->removeType == OBJECT_PROCEDURE) { appendStringInfoString(buf, "DROP PROCEDURE "); } + else + { + appendStringInfoString(buf, "DROP AGGREGATE "); + } if (stmt->missing_ok) { diff --git a/src/backend/distributed/deparser/objectaddress.c b/src/backend/distributed/deparser/objectaddress.c index fb31e06b4..7b23bc596 100644 --- a/src/backend/distributed/deparser/objectaddress.c +++ b/src/backend/distributed/deparser/objectaddress.c @@ -144,6 +144,7 @@ RenameStmtObjectAddress(RenameStmt *stmt, bool missing_ok) } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { return RenameFunctionStmtObjectAddress(stmt, missing_ok); @@ -169,6 +170,7 @@ AlterObjectSchemaStmtObjectAddress(AlterObjectSchemaStmt *stmt, bool missing_ok) } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { return AlterFunctionSchemaStmtObjectAddress(stmt, missing_ok); @@ -215,6 +217,7 @@ AlterOwnerStmtObjectAddress(AlterOwnerStmt *stmt, bool missing_ok) } case OBJECT_PROCEDURE: + case OBJECT_AGGREGATE: case OBJECT_FUNCTION: { return AlterFunctionOwnerObjectAddress(stmt, missing_ok); diff --git a/src/backend/distributed/deparser/qualify_function_stmt.c b/src/backend/distributed/deparser/qualify_function_stmt.c index f2481fbd1..6294a6a21 100644 --- a/src/backend/distributed/deparser/qualify_function_stmt.c +++ b/src/backend/distributed/deparser/qualify_function_stmt.c @@ -28,13 +28,22 @@ #include "utils/syscache.h" /* forward declaration for qualify functions */ -void QualifyFunction(ObjectWithArgs *func, ObjectType type); -void QualifyFunctionSchemaName(ObjectWithArgs *func, ObjectType type); +static void QualifyFunction(ObjectWithArgs *func, ObjectType type); +static void QualifyFunctionSchemaName(ObjectWithArgs *func, ObjectType type); + + +/* AssertObjectTypeIsFunctionType asserts we aren't receiving something we shouldn't */ +void +AssertObjectTypeIsFunctional(ObjectType type) +{ + Assert(type == OBJECT_AGGREGATE || type == OBJECT_FUNCTION || type == + OBJECT_PROCEDURE); +} /* * QualifyAlterFunctionStmt transforms a - * ALTER {FUNCTION|PROCEDURE} .. + * ALTER {AGGREGATE|FUNCTION|PROCEDURE} .. * statement in place and makes all (supported) statements fully qualified. * * Note that not all queries of this form are valid AlterFunctionStmt @@ -43,19 +52,21 @@ void QualifyFunctionSchemaName(ObjectWithArgs *func, ObjectType type); void QualifyAlterFunctionStmt(AlterFunctionStmt *stmt) { + AssertObjectTypeIsFunctional(stmt->objtype); + QualifyFunction(stmt->func, stmt->objtype); } /* * QualifyRenameFunctionStmt transforms a - * ALTER {FUNCTION|PROCEDURE} .. RENAME TO .. + * ALTER {AGGREGATE|FUNCTION|PROCEDURE} .. RENAME TO .. * statement in place and makes the function name fully qualified. */ void QualifyRenameFunctionStmt(RenameStmt *stmt) { - Assert(stmt->renameType == OBJECT_FUNCTION || stmt->renameType == OBJECT_PROCEDURE); + AssertObjectTypeIsFunctional(stmt->renameType); QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->renameType); } @@ -63,13 +74,13 @@ QualifyRenameFunctionStmt(RenameStmt *stmt) /* * QualifyAlterFunctionSchemaStmt transforms a - * ALTER {FUNCTION|PROCEDURE} .. SET SCHEMA .. + * ALTER {AGGREGATE|FUNCTION|PROCEDURE} .. SET SCHEMA .. * statement in place and makes the function name fully qualified. */ void QualifyAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt) { - Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); + AssertObjectTypeIsFunctional(stmt->objectType); QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType); } @@ -77,13 +88,13 @@ QualifyAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt) /* * QualifyAlterFunctionOwnerStmt transforms a - * ALTER {FUNCTION|PROCEDURE} .. OWNER TO .. + * ALTER {AGGREGATE|FUNCTION|PROCEDURE} .. OWNER TO .. * statement in place and makes the function name fully qualified. */ void QualifyAlterFunctionOwnerStmt(AlterOwnerStmt *stmt) { - Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); + AssertObjectTypeIsFunctional(stmt->objectType); QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType); } @@ -91,13 +102,13 @@ QualifyAlterFunctionOwnerStmt(AlterOwnerStmt *stmt) /* * QualifyAlterFunctionDependsStmt transforms a - * ALTER {FUNCTION|PROCEDURE} .. DEPENDS ON EXTENSIOIN .. + * ALTER {FUNCTION|PROCEDURE} .. DEPENDS ON EXTENSION .. * statement in place and makes the function name fully qualified. */ void QualifyAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt) { - Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); + AssertObjectTypeIsFunctional(stmt->objectType); QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType); } diff --git a/src/backend/distributed/utils/ruleutils_11.c b/src/backend/distributed/utils/ruleutils_11.c index 4b3e3ddba..816803ff8 100644 --- a/src/backend/distributed/utils/ruleutils_11.c +++ b/src/backend/distributed/utils/ruleutils_11.c @@ -434,7 +434,6 @@ static char *generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes, bool has_variadic, bool *use_variadic_p, ParseExprKind special_exprkind); -static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2); #define only_marker(rte) ((rte)->inh ? "" : "ONLY ") @@ -7878,7 +7877,7 @@ generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes, * plus the OPERATOR() decoration needed to use a qualified operator name * in an expression. */ -static char * +char * generate_operator_name(Oid operid, Oid arg1, Oid arg2) { StringInfoData buf; diff --git a/src/backend/distributed/utils/ruleutils_12.c b/src/backend/distributed/utils/ruleutils_12.c index b786515ee..2bca08845 100644 --- a/src/backend/distributed/utils/ruleutils_12.c +++ b/src/backend/distributed/utils/ruleutils_12.c @@ -435,7 +435,6 @@ static char *generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes, bool has_variadic, bool *use_variadic_p, ParseExprKind special_exprkind); -static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2); #define only_marker(rte) ((rte)->inh ? "" : "ONLY ") @@ -7879,7 +7878,7 @@ generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes, * plus the OPERATOR() decoration needed to use a qualified operator name * in an expression. */ -static char * +char * generate_operator_name(Oid operid, Oid arg1, Oid arg2) { StringInfoData buf; diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 0a8717a4e..470f45726 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -50,6 +50,7 @@ extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, StringInfo buffer); extern char * generate_relation_name(Oid relid, List *namespaces); extern char * generate_qualified_relation_name(Oid relid); +extern char * generate_operator_name(Oid operid, Oid arg1, Oid arg2); #endif /* CITUS_RULEUTILS_H */ diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index e28a63d4c..6a64931f3 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -27,6 +27,8 @@ extern char * FormatCollateBE(Oid collate_oid); extern char * FormatCollateBEQualified(Oid collate_oid); extern char * FormatCollateExtended(Oid collid, bits16 flags); +extern void AssertObjectTypeIsFunctional(ObjectType type); + extern void QualifyTreeNode(Node *stmt); extern const char * DeparseTreeNode(Node *stmt); diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 7942bf846..a638779b8 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -37,8 +37,10 @@ CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -CREATE FUNCTION add_text(text, text) RETURNS int - AS 'select $1::int + $2::int;' +-- $function$ is what postgres escapes functions with when deparsing +-- make sure $function$ doesn't cause invalid syntax +CREATE FUNCTION add_text(text, text) RETURNS text + AS 'select $function$test$function$ || $1::int || $2::int;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; @@ -70,6 +72,41 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; +-- Include aggregate function case +CREATE FUNCTION agg_sfunc(state int, item int) +RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ +begin + return state + item; +end; +$$; +CREATE FUNCTION agg_invfunc(state int, item int) +RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ +begin + return state - item; +end; +$$; +CREATE FUNCTION agg_finalfunc(state int, extra int) +RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ +begin + return state * 2; +end; +$$; +CREATE AGGREGATE sum2(int) ( + sfunc = agg_sfunc, + stype = int, + sspace = 8, + finalfunc = agg_finalfunc, + finalfunc_extra, + initcond = '5', + msfunc = agg_sfunc, + mstype = int, + msspace = 12, + minvfunc = agg_invfunc, + mfinalfunc = agg_finalfunc, + mfinalfunc_extra, + minitcond = '1', + sortop = ">" +); -- make sure to propagate ddl propagation after we have setup our functions, this will -- allow alter statements to be propagated and keep the functions in sync across machines SET citus.enable_ddl_propagation TO on; @@ -152,6 +189,13 @@ SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); t (1 row) +-- distribute aggregate +SELECT create_distributed_function('sum2(int)'); + create_distributed_function +----------------------------- + +(1 row) + -- testing alter statements for a distributed function -- ROWS 5, untested because; -- ERROR: ROWS is not applicable when function does not return a set @@ -256,6 +300,15 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER B (2 rows) ALTER FUNCTION add2(int,int) RENAME TO add; +ALTER AGGREGATE sum2(int) RENAME TO sum27; +SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'sum27';$$) ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 1 + localhost | 57638 | t | 1 +(2 rows) + +ALTER AGGREGATE sum27(int) RENAME TO sum2; -- change the owner of the function and verify the owner has been changed on the workers ALTER FUNCTION add(int,int) OWNER TO functionuser; SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); @@ -264,6 +317,7 @@ SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); t (1 row) +ALTER AGGREGATE sum2(int) OWNER TO functionuser; SELECT run_command_on_workers($$ SELECT row(usename, nspname, proname) FROM pg_proc @@ -277,6 +331,19 @@ $$); (localhost,57638,t,"(functionuser,function_tests,add)") (2 rows) +SELECT run_command_on_workers($$ +SELECT row(usename, nspname, proname) +FROM pg_proc +JOIN pg_user ON (usesysid = proowner) +JOIN pg_namespace ON (pg_namespace.oid = pronamespace) +WHERE proname = 'sum2'; +$$); + run_command_on_workers +---------------------------------------------------------- + (localhost,57637,t,"(functionuser,function_tests,sum2)") + (localhost,57638,t,"(functionuser,function_tests,sum2)") +(2 rows) + -- change the schema of the function and verify the old schema doesn't exist anymore while -- the new schema has the function. ALTER FUNCTION add(int,int) SET SCHEMA function_tests2; @@ -301,6 +368,7 @@ SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER B (2 rows) ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; +ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2; -- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded @@ -337,6 +405,15 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist (2 rows) +DROP AGGREGATE function_tests2.sum2(int); +-- call should fail as aggregate should have been dropped +SELECT * FROM run_command_on_workers('SELECT function_tests2.sum2(id) FROM (select 1 id, 2) subq;') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+--------------------------------------------------------------- + localhost | 57637 | f | ERROR: function function_tests2.sum2(integer) does not exist + localhost | 57638 | f | ERROR: function function_tests2.sum2(integer) does not exist +(2 rows) + -- postgres doesn't accept parameter names in the regprocedure input SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); ERROR: syntax error at or near "int" @@ -442,7 +519,7 @@ SELECT create_distributed_function('add_with_param_names(int, int)','$1'); (1 row) --- a function cannot be colocated with a table that is not "streaming" replicated +-- a function cannot be colocated with a table that is not "streaming" replicated SET citus.shard_replication_factor TO 2; CREATE TABLE replicated_table_func_test (a int); SET citus.replication_model TO "statement"; @@ -508,8 +585,8 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$1', coloc -- show that the colocationIds are the same SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated -FROM pg_dist_partition, citus.pg_dist_object as objects -WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'add_with_param_names(int, int)'::regprocedure; table_and_function_colocated ------------------------------ @@ -517,7 +594,7 @@ WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass (1 row) -- now, re-distributed with the default colocation option, we should still see that the same colocation --- group preserved, because we're using the default shard creationg settings +-- group preserved, because we're using the default shard creation settings SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); create_distributed_function ----------------------------- @@ -525,15 +602,15 @@ SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); (1 row) SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated -FROM pg_dist_partition, citus.pg_dist_object as objects -WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'add_with_param_names(int, int)'::regprocedure; table_and_function_colocated ------------------------------ t (1 row) --- function with a numeric dist. arg can be colocated with int +-- function with a numeric dist. arg can be colocated with int -- column of a distributed table. In general, if there is a coercion -- path, we rely on postgres for implicit coersions, and users for explicit coersions -- to coerce the values @@ -544,8 +621,8 @@ SELECT create_distributed_function('add_numeric(numeric, numeric)', '$1', coloca (1 row) SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated -FROM pg_dist_partition, citus.pg_dist_object as objects -WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'add_numeric(numeric, numeric)'::regprocedure; table_and_function_colocated ------------------------------ @@ -559,8 +636,8 @@ SELECT create_distributed_function('add_text(text, text)', '$1', colocate_with:= (1 row) SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated -FROM pg_dist_partition, citus.pg_dist_object as objects -WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'add_text(text, text)'::regprocedure; table_and_function_colocated ------------------------------ diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index 65b592b3b..ad5d7eff2 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -32,8 +32,10 @@ CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric IMMUTABLE RETURNS NULL ON NULL INPUT; -CREATE FUNCTION add_text(text, text) RETURNS int - AS 'select $1::int + $2::int;' +-- $function$ is what postgres escapes functions with when deparsing +-- make sure $function$ doesn't cause invalid syntax +CREATE FUNCTION add_text(text, text) RETURNS text + AS 'select $function$test$function$ || $1::int || $2::int;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; @@ -73,6 +75,45 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer IMMUTABLE RETURNS NULL ON NULL INPUT; +-- Include aggregate function case +CREATE FUNCTION agg_sfunc(state int, item int) +RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ +begin + return state + item; +end; +$$; + +CREATE FUNCTION agg_invfunc(state int, item int) +RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ +begin + return state - item; +end; +$$; + +CREATE FUNCTION agg_finalfunc(state int, extra int) +RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ +begin + return state * 2; +end; +$$; + +CREATE AGGREGATE sum2(int) ( + sfunc = agg_sfunc, + stype = int, + sspace = 8, + finalfunc = agg_finalfunc, + finalfunc_extra, + initcond = '5', + msfunc = agg_sfunc, + mstype = int, + msspace = 12, + minvfunc = agg_invfunc, + mfinalfunc = agg_finalfunc, + mfinalfunc_extra, + minitcond = '1', + sortop = ">" +); + -- make sure to propagate ddl propagation after we have setup our functions, this will -- allow alter statements to be propagated and keep the functions in sync across machines SET citus.enable_ddl_propagation TO on; @@ -105,6 +146,9 @@ SELECT create_distributed_function('add(int,int)', '$1'); SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +-- distribute aggregate +SELECT create_distributed_function('sum2(int)'); + -- testing alter statements for a distributed function -- ROWS 5, untested because; -- ERROR: ROWS is not applicable when function does not return a set @@ -140,9 +184,14 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2; ALTER FUNCTION add2(int,int) RENAME TO add; +ALTER AGGREGATE sum2(int) RENAME TO sum27; +SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'sum27';$$) ORDER BY 1,2; +ALTER AGGREGATE sum27(int) RENAME TO sum2; + -- change the owner of the function and verify the owner has been changed on the workers ALTER FUNCTION add(int,int) OWNER TO functionuser; SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER AGGREGATE sum2(int) OWNER TO functionuser; SELECT run_command_on_workers($$ SELECT row(usename, nspname, proname) FROM pg_proc @@ -150,6 +199,13 @@ JOIN pg_user ON (usesysid = proowner) JOIN pg_namespace ON (pg_namespace.oid = pronamespace) WHERE proname = 'add'; $$); +SELECT run_command_on_workers($$ +SELECT row(usename, nspname, proname) +FROM pg_proc +JOIN pg_user ON (usesysid = proowner) +JOIN pg_namespace ON (pg_namespace.oid = pronamespace) +WHERE proname = 'sum2'; +$$); -- change the schema of the function and verify the old schema doesn't exist anymore while -- the new schema has the function. @@ -159,6 +215,8 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2; ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; +ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2; + -- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded @@ -177,6 +235,10 @@ DROP FUNCTION add(int,int); -- call should fail as function should have been dropped SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; +DROP AGGREGATE function_tests2.sum2(int); +-- call should fail as aggregate should have been dropped +SELECT * FROM run_command_on_workers('SELECT function_tests2.sum2(id) FROM (select 1 id, 2) subq;') ORDER BY 1,2; + -- postgres doesn't accept parameter names in the regprocedure input SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); @@ -230,7 +292,7 @@ SELECT create_distributed_function('add_with_param_names(int, int)', distributio -- valid distribution with distribution_arg_index SELECT create_distributed_function('add_with_param_names(int, int)','$1'); --- a function cannot be colocated with a table that is not "streaming" replicated +-- a function cannot be colocated with a table that is not "streaming" replicated SET citus.shard_replication_factor TO 2; CREATE TABLE replicated_table_func_test (a int); SET citus.replication_model TO "statement"; @@ -265,32 +327,32 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$1', coloc -- show that the colocationIds are the same SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated -FROM pg_dist_partition, citus.pg_dist_object as objects -WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'add_with_param_names(int, int)'::regprocedure; -- now, re-distributed with the default colocation option, we should still see that the same colocation --- group preserved, because we're using the default shard creationg settings +-- group preserved, because we're using the default shard creation settings SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated -FROM pg_dist_partition, citus.pg_dist_object as objects -WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'add_with_param_names(int, int)'::regprocedure; --- function with a numeric dist. arg can be colocated with int +-- function with a numeric dist. arg can be colocated with int -- column of a distributed table. In general, if there is a coercion -- path, we rely on postgres for implicit coersions, and users for explicit coersions -- to coerce the values SELECT create_distributed_function('add_numeric(numeric, numeric)', '$1', colocate_with:='replicated_table_func_test_4'); SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated -FROM pg_dist_partition, citus.pg_dist_object as objects -WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'add_numeric(numeric, numeric)'::regprocedure; SELECT create_distributed_function('add_text(text, text)', '$1', colocate_with:='replicated_table_func_test_4'); SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated -FROM pg_dist_partition, citus.pg_dist_object as objects -WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND +FROM pg_dist_partition, citus.pg_dist_object as objects +WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'add_text(text, text)'::regprocedure; -- cannot distribute function because there is no