create_distributed_function: accept aggregates

fix_120_custom_aggregates_distribute_multiarg
Philip Dubé 2019-09-27 18:26:07 +00:00
parent 210a6cc04b
commit c32bd459f4
14 changed files with 686 additions and 90 deletions

View File

@ -144,24 +144,6 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
{ {
switch (getObjectClass(dependency)) switch (getObjectClass(dependency))
{ {
case OCLASS_SCHEMA:
{
const char *schemaDDLCommand = CreateSchemaDDLCommand(dependency->objectId);
if (schemaDDLCommand == NULL)
{
/* no schema to create */
return NIL;
}
return list_make1((void *) schemaDDLCommand);
}
case OCLASS_TYPE:
{
return CreateTypeDDLCommandsIdempotent(dependency);
}
case OCLASS_CLASS: case OCLASS_CLASS:
{ {
/* /*
@ -182,6 +164,24 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
return CreateFunctionDDLCommandsIdempotent(dependency); return CreateFunctionDDLCommandsIdempotent(dependency);
} }
case OCLASS_SCHEMA:
{
const char *schemaDDLCommand = CreateSchemaDDLCommand(dependency->objectId);
if (schemaDDLCommand == NULL)
{
/* no schema to create */
return NIL;
}
return list_make1((void *) schemaDDLCommand);
}
case OCLASS_TYPE:
{
return CreateTypeDDLCommandsIdempotent(dependency);
}
default: default:
{ {
break; break;

View File

@ -24,10 +24,12 @@
#endif #endif
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/pg_aggregate.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/extension.h" #include "commands/extension.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/commands.h" #include "distributed/commands.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
@ -54,6 +56,7 @@
(strncmp(arg, prefix, strlen(prefix)) == 0) (strncmp(arg, prefix, strlen(prefix)) == 0)
/* forward declaration for helper functions*/ /* forward declaration for helper functions*/
static char * GetAggregateDDLCommand(const RegProcedure funcOid);
static char * GetFunctionDDLCommand(const RegProcedure funcOid); static char * GetFunctionDDLCommand(const RegProcedure funcOid);
static char * GetFunctionAlterOwnerCommand(const RegProcedure funcOid); static char * GetFunctionAlterOwnerCommand(const RegProcedure funcOid);
static int GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName, static int GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName,
@ -75,12 +78,14 @@ static ObjectAddress * FunctionToObjectAddress(ObjectType objectType,
bool missing_ok); bool missing_ok);
static void ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt); static void ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt);
static void ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress); static void ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress);
static char * quote_qualified_func_name(Oid funcOid);
PG_FUNCTION_INFO_V1(create_distributed_function); PG_FUNCTION_INFO_V1(create_distributed_function);
#define AssertIsFunctionOrProcedure(objtype) \ #define AssertIsFunctionOrProcedure(objtype) \
Assert((objtype) == OBJECT_FUNCTION || (objtype) == OBJECT_PROCEDURE) Assert((objtype) == OBJECT_FUNCTION || (objtype) == OBJECT_PROCEDURE || (objtype) == \
OBJECT_AGGREGATE)
/* /*
@ -537,27 +542,33 @@ GetFunctionDDLCommand(const RegProcedure funcOid)
StringInfo ddlCommand = makeStringInfo(); StringInfo ddlCommand = makeStringInfo();
OverrideSearchPath *overridePath = NULL; OverrideSearchPath *overridePath = NULL;
Datum sqlTextDatum = 0;
char *createFunctionSQL = NULL; char *createFunctionSQL = NULL;
char *alterFunctionOwnerSQL = NULL; char *alterFunctionOwnerSQL = NULL;
/* createFunctionSQL = GetAggregateDDLCommand(funcOid);
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
*/
overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
sqlTextDatum = DirectFunctionCall1(pg_get_functiondef, if (createFunctionSQL == NULL)
ObjectIdGetDatum(funcOid)); {
Datum sqlTextDatum = (Datum) 0;
/* revert back to original search_path */ /*
PopOverrideSearchPath(); * Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
*/
overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
sqlTextDatum = DirectFunctionCall1(pg_get_functiondef,
ObjectIdGetDatum(funcOid));
createFunctionSQL = TextDatumGetCString(sqlTextDatum);
/* revert back to original search_path */
PopOverrideSearchPath();
}
createFunctionSQL = TextDatumGetCString(sqlTextDatum);
alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid); alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid);
appendStringInfo(ddlCommand, "%s;%s", createFunctionSQL, alterFunctionOwnerSQL); appendStringInfo(ddlCommand, "%s;%s", createFunctionSQL, alterFunctionOwnerSQL);
@ -575,7 +586,7 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid)
{ {
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcOid)); HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcOid));
StringInfo alterCommand = makeStringInfo(); StringInfo alterCommand = makeStringInfo();
bool isProcedure = false; char *kindString = "FUNCTION";
Oid procOwner = InvalidOid; Oid procOwner = InvalidOid;
char *functionSignature = NULL; char *functionSignature = NULL;
@ -592,7 +603,14 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid)
procOwner = procform->proowner; procOwner = procform->proowner;
isProcedure = procform->prokind == PROKIND_PROCEDURE; if (procform->prokind == PROKIND_PROCEDURE)
{
kindString = "PROCEDURE";
}
else if (procform->prokind == PROKIND_AGGREGATE)
{
kindString = "AGGREGATE";
}
ReleaseSysCache(proctup); ReleaseSysCache(proctup);
} }
@ -628,7 +646,7 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid)
functionOwner = GetUserNameFromId(procOwner, false); functionOwner = GetUserNameFromId(procOwner, false);
appendStringInfo(alterCommand, "ALTER %s %s OWNER TO %s;", appendStringInfo(alterCommand, "ALTER %s %s OWNER TO %s;",
(isProcedure ? "PROCEDURE" : "FUNCTION"), kindString,
functionSignature, functionSignature,
quote_identifier(functionOwner)); quote_identifier(functionOwner));
@ -636,6 +654,387 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid)
} }
/*
* GetAggregateDDLCommand returns a string for creating an aggregate.
* Returns NULL if funcOid is not an aggregate.
*/
static char *
GetAggregateDDLCommand(const RegProcedure funcOid)
{
StringInfoData buf = { 0 };
#if PG_VERSION_NUM < 120000
/*
* CREATE OR REPLACE AGGREGATE only introduced in pg12,
* so we hack around that with a DO block & exception handler.
* Requires some care to pick the correct $delimiter$
*/
StringInfoData resultbuf = { 0 };
#endif
HeapTuple proctup = NULL;
Form_pg_proc proc = NULL;
HeapTuple aggtup = NULL;
Form_pg_aggregate agg = NULL;
const char *name = NULL;
const char *nsp = NULL;
int numargs = 0;
int i = 0;
Oid *argtypes = NULL;
char **argnames = NULL;
char *argmodes = NULL;
int insertorderbyat = -1;
int argsprinted = 0;
int inputargno = 0;
proctup = SearchSysCache1(PROCOID, funcOid);
if (!HeapTupleIsValid(proctup))
{
goto early_exit;
}
proc = (Form_pg_proc) GETSTRUCT(proctup);
if (proc->prokind != PROKIND_AGGREGATE)
{
goto early_exit;
}
initStringInfo(&buf);
name = NameStr(proc->proname);
nsp = get_namespace_name(proc->pronamespace);
#if PG_VERSION_NUM >= 120000
appendStringInfo(&buf, "CREATE OR REPLACE AGGREGATE %s(",
quote_qualified_identifier(nsp, name));
#else
appendStringInfo(&buf, "CREATE AGGREGATE %s(",
quote_qualified_identifier(nsp, name));
#endif
/* Parameters, orrows heavily from print_function_arguments in postgres */
numargs = get_func_arg_info(proctup, &argtypes, &argnames, &argmodes);
aggtup = SearchSysCache1(AGGFNOID, funcOid);
if (!HeapTupleIsValid(aggtup))
{
goto early_exit;
}
agg = (Form_pg_aggregate) GETSTRUCT(aggtup);
if (AGGKIND_IS_ORDERED_SET(agg->aggkind))
{
insertorderbyat = agg->aggnumdirectargs;
}
for (i = 0; i < numargs; i++)
{
Oid argtype = argtypes[i];
char *argname = argnames ? argnames[i] : NULL;
char argmode = argmodes ? argmodes[i] : PROARGMODE_IN;
const char *modename;
switch (argmode)
{
case PROARGMODE_IN:
{
modename = "";
break;
}
case PROARGMODE_VARIADIC:
{
modename = "VARIADIC ";
break;
}
default:
{
elog(ERROR, "unexpected parameter mode '%c'", argmode);
modename = NULL;
break;
}
}
inputargno++; /* this is a 1-based counter */
if (argsprinted == insertorderbyat)
{
appendStringInfoString(&buf, " ORDER BY");
}
else if (argsprinted)
{
appendStringInfoString(&buf, ", ");
}
appendStringInfoString(&buf, modename);
if (argname && argname[0])
{
appendStringInfo(&buf, "%s ", quote_identifier(argname));
}
appendStringInfoString(&buf, format_type_be(argtype));
argsprinted++;
/* nasty hack: print the last arg twice for variadic ordered-set agg */
if (argsprinted == insertorderbyat && i == numargs - 1)
{
i--;
}
}
appendStringInfoString(&buf, ") (");
appendStringInfo(&buf, "STYPE = %s,SFUNC = %s",
format_type_be_qualified(agg->aggtranstype),
quote_qualified_func_name(agg->aggtransfn));
if (agg->aggtransspace != 0)
{
appendStringInfo(&buf, ", SSPACE = %d", agg->aggtransspace);
}
if (agg->aggfinalfn != InvalidOid)
{
const char *finalmodifystring = NULL;
switch (agg->aggfinalmodify)
{
case AGGMODIFY_READ_ONLY:
{
finalmodifystring = "READ_ONLY";
break;
}
case AGGMODIFY_SHAREABLE:
{
finalmodifystring = "SHAREABLE";
break;
}
case AGGMODIFY_READ_WRITE:
{
finalmodifystring = "READ_WRITE";
break;
}
}
appendStringInfo(&buf, ", FINALFUNC = %s",
quote_qualified_func_name(agg->aggfinalfn));
if (finalmodifystring != NULL)
{
appendStringInfo(&buf, ", FINALFUNC_MODIFY = %s", finalmodifystring);
}
if (agg->aggfinalextra)
{
appendStringInfoString(&buf, ", FINALFUNC_EXTRA");
}
}
if (agg->aggmtransspace != 0)
{
appendStringInfo(&buf, ", MSSPACE = %d", agg->aggmtransspace);
}
if (agg->aggmfinalfn)
{
const char *mfinalmodifystring = NULL;
switch (agg->aggfinalmodify)
{
case AGGMODIFY_READ_ONLY:
{
mfinalmodifystring = "READ_ONLY";
break;
}
case AGGMODIFY_SHAREABLE:
{
mfinalmodifystring = "SHAREABLE";
break;
}
case AGGMODIFY_READ_WRITE:
{
mfinalmodifystring = "READ_WRITE";
break;
}
}
appendStringInfo(&buf, ", MFINALFUNC = %s",
quote_qualified_func_name(agg->aggmfinalfn));
if (mfinalmodifystring != NULL)
{
appendStringInfo(&buf, ", MFINALFUNC_MODIFY = %s", mfinalmodifystring);
}
if (agg->aggmfinalextra)
{
appendStringInfoString(&buf, ", MFINALFUNC_EXTRA");
}
}
if (agg->aggmtransfn)
{
appendStringInfo(&buf, ", MSFUNC = %s",
quote_qualified_func_name(agg->aggmtransfn));
if (agg->aggmtranstype)
{
appendStringInfo(&buf, ", MSTYPE = %s",
format_type_be_qualified(agg->aggmtranstype));
}
}
if (agg->aggtransspace != 0)
{
appendStringInfo(&buf, ", SSPACE = %d", agg->aggtransspace);
}
if (agg->aggminvtransfn)
{
appendStringInfo(&buf, ", MINVFUNC = %s",
quote_qualified_func_name(agg->aggminvtransfn));
}
if (agg->aggcombinefn)
{
appendStringInfo(&buf, ", COMBINEFUNC = %s",
quote_qualified_func_name(agg->aggcombinefn));
}
if (agg->aggserialfn)
{
appendStringInfo(&buf, ", SERIALFUNC = %s",
quote_qualified_func_name(agg->aggserialfn));
}
if (agg->aggdeserialfn)
{
appendStringInfo(&buf, ", DESERIALFUNC = %s",
quote_qualified_func_name(agg->aggdeserialfn));
}
if (agg->aggsortop != InvalidOid)
{
appendStringInfo(&buf, ", SORTOP = %s",
generate_operator_name(agg->aggsortop, argtypes[0],
argtypes[0]));
}
{
const char *parallelstring = NULL;
switch (proc->proparallel)
{
case PROPARALLEL_SAFE:
{
parallelstring = "SAFE";
break;
}
case PROPARALLEL_RESTRICTED:
{
parallelstring = "RESTRICTED";
break;
}
case PROPARALLEL_UNSAFE:
{
break;
}
default:
elog(WARNING, "Unknown parallel option, ignoring: %c", proc->proparallel);
}
if (parallelstring != NULL)
{
appendStringInfo(&buf, ", PARALLEL = %s", parallelstring);
}
}
{
bool isNull = false;
Datum textInitVal = SysCacheGetAttr(AGGFNOID, aggtup,
Anum_pg_aggregate_agginitval,
&isNull);
if (!isNull)
{
char *strInitVal = TextDatumGetCString(textInitVal);
char *strInitValQuoted = quote_literal_cstr(strInitVal);
appendStringInfo(&buf, ", INITCOND = %s", strInitValQuoted);
pfree(strInitValQuoted);
pfree(strInitVal);
}
}
{
bool isNull = false;
Datum textInitVal = SysCacheGetAttr(AGGFNOID, aggtup,
Anum_pg_aggregate_aggminitval,
&isNull);
if (!isNull)
{
char *strInitVal = TextDatumGetCString(textInitVal);
char *strInitValQuoted = quote_literal_cstr(strInitVal);
appendStringInfo(&buf, ", MINITCOND = %s", strInitValQuoted);
pfree(strInitValQuoted);
pfree(strInitVal);
}
}
if (agg->aggkind == AGGKIND_HYPOTHETICAL)
{
appendStringInfoString(&buf, ", HYPOTHETICAL");
}
appendStringInfoChar(&buf, ')');
#if PG_VERSION_NUM < 120000
{
/* construct a dollar quoted string, making sure the quoted text doesn't contain the dollar quote */
StringInfoData delim;
initStringInfo(&delim);
initStringInfo(&resultbuf);
appendStringInfoChar(&delim, '$');
while (strstr(buf.data, delim.data) != NULL)
{
appendStringInfoChar(&delim, 'z');
}
appendStringInfo(&resultbuf,
"DO %s$BEGIN %s;EXCEPTION WHEN duplicate_function THEN NULL;END%s$",
delim.data, buf.data, delim.data);
pfree(delim.data);
pfree(buf.data);
}
#endif
early_exit:
if (aggtup && HeapTupleIsValid(aggtup))
{
ReleaseSysCache(aggtup);
}
if (proctup && HeapTupleIsValid(proctup))
{
ReleaseSysCache(proctup);
}
#if PG_VERSION_NUM >= 120000
return buf.data;
#else
return resultbuf.data;
#endif
}
/* /*
* EnsureSequentialModeForFunctionDDL makes sure that the current transaction is already in * EnsureSequentialModeForFunctionDDL makes sure that the current transaction is already in
* sequential mode, or can still safely be put in sequential mode, it errors if that is * sequential mode, or can still safely be put in sequential mode, it errors if that is
@ -1410,3 +1809,13 @@ ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress)
extensionName))); extensionName)));
} }
} }
/* returns the quoted qualified name of a given function oid */
static char *
quote_qualified_func_name(Oid funcOid)
{
return quote_qualified_identifier(
get_namespace_name(get_func_namespace(funcOid)),
get_func_name(funcOid));
}

View File

@ -122,6 +122,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString)
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
return PlanAlterFunctionSchemaStmt(stmt, queryString); return PlanAlterFunctionSchemaStmt(stmt, queryString);
@ -197,6 +198,7 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryStrin
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
ProcessAlterFunctionSchemaStmt(stmt, queryString); ProcessAlterFunctionSchemaStmt(stmt, queryString);

View File

@ -426,6 +426,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
ddlJobs = PlanDropFunctionStmt(dropStatement, queryString); ddlJobs = PlanDropFunctionStmt(dropStatement, queryString);
@ -482,6 +483,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
ddlJobs = PlanRenameFunctionStmt(renameStmt, queryString); ddlJobs = PlanRenameFunctionStmt(renameStmt, queryString);
@ -839,6 +841,7 @@ PlanAlterOwnerStmt(AlterOwnerStmt *stmt, const char *queryString)
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
return PlanAlterFunctionOwnerStmt(stmt, queryString); return PlanAlterFunctionOwnerStmt(stmt, queryString);

View File

@ -121,6 +121,7 @@ DeparseDropStmt(DropStmt *stmt)
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
return DeparseDropFunctionStmt(stmt); return DeparseDropFunctionStmt(stmt);
@ -187,6 +188,7 @@ DeparseRenameStmt(RenameStmt *stmt)
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
return DeparseRenameFunctionStmt(stmt); return DeparseRenameFunctionStmt(stmt);
@ -240,6 +242,7 @@ DeparseAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
return DeparseAlterFunctionSchemaStmt(stmt); return DeparseAlterFunctionSchemaStmt(stmt);
@ -272,6 +275,7 @@ DeparseAlterOwnerStmt(AlterOwnerStmt *stmt)
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
return DeparseAlterFunctionOwnerStmt(stmt); return DeparseAlterFunctionOwnerStmt(stmt);
@ -299,6 +303,7 @@ DeparseAlterObjectDependsStmt(AlterObjectDependsStmt *stmt)
switch (stmt->objectType) switch (stmt->objectType)
{ {
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
return DeparseAlterFunctionDependsStmt(stmt); return DeparseAlterFunctionDependsStmt(stmt);

View File

@ -90,14 +90,17 @@ AppendAlterFunctionStmt(StringInfo buf, AlterFunctionStmt *stmt)
{ {
appendStringInfo(buf, "ALTER FUNCTION "); appendStringInfo(buf, "ALTER FUNCTION ");
} }
else else if (stmt->objtype == OBJECT_PROCEDURE)
{ {
appendStringInfo(buf, "ALTER PROCEDURE "); appendStringInfo(buf, "ALTER PROCEDURE ");
} }
else
{
appendStringInfo(buf, "ALTER AGGREGATE ");
}
AppendFunctionName(buf, stmt->func, stmt->objtype); AppendFunctionName(buf, stmt->func, stmt->objtype);
foreach(actionCell, stmt->actions) foreach(actionCell, stmt->actions)
{ {
DefElem *def = castNode(DefElem, lfirst(actionCell)); DefElem *def = castNode(DefElem, lfirst(actionCell));
@ -298,7 +301,7 @@ DeparseRenameFunctionStmt(RenameStmt *stmt)
StringInfoData str = { 0 }; StringInfoData str = { 0 };
initStringInfo(&str); initStringInfo(&str);
Assert(stmt->renameType == OBJECT_FUNCTION || stmt->renameType == OBJECT_PROCEDURE); AssertObjectTypeIsFunctional(stmt->renameType);
AppendRenameFunctionStmt(&str, stmt); AppendRenameFunctionStmt(&str, stmt);
@ -318,10 +321,14 @@ AppendRenameFunctionStmt(StringInfo buf, RenameStmt *stmt)
{ {
appendStringInfoString(buf, "ALTER FUNCTION "); appendStringInfoString(buf, "ALTER FUNCTION ");
} }
else else if (stmt->renameType == OBJECT_PROCEDURE)
{ {
appendStringInfoString(buf, "ALTER PROCEDURE "); appendStringInfoString(buf, "ALTER PROCEDURE ");
} }
else
{
appendStringInfoString(buf, "ALTER AGGREGATE ");
}
AppendFunctionName(buf, func, stmt->renameType); AppendFunctionName(buf, func, stmt->renameType);
@ -338,7 +345,7 @@ DeparseAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt)
StringInfoData str = { 0 }; StringInfoData str = { 0 };
initStringInfo(&str); initStringInfo(&str);
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); AssertObjectTypeIsFunctional(stmt->objectType);
AppendAlterFunctionSchemaStmt(&str, stmt); AppendAlterFunctionSchemaStmt(&str, stmt);
@ -358,10 +365,14 @@ AppendAlterFunctionSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt)
{ {
appendStringInfoString(buf, "ALTER FUNCTION "); appendStringInfoString(buf, "ALTER FUNCTION ");
} }
else else if (stmt->objectType == OBJECT_PROCEDURE)
{ {
appendStringInfoString(buf, "ALTER PROCEDURE "); appendStringInfoString(buf, "ALTER PROCEDURE ");
} }
else
{
appendStringInfoString(buf, "ALTER AGGREGATE ");
}
AppendFunctionName(buf, func, stmt->objectType); AppendFunctionName(buf, func, stmt->objectType);
appendStringInfo(buf, " SET SCHEMA %s;", quote_identifier(stmt->newschema)); appendStringInfo(buf, " SET SCHEMA %s;", quote_identifier(stmt->newschema));
@ -377,7 +388,7 @@ DeparseAlterFunctionOwnerStmt(AlterOwnerStmt *stmt)
StringInfoData str = { 0 }; StringInfoData str = { 0 };
initStringInfo(&str); initStringInfo(&str);
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); AssertObjectTypeIsFunctional(stmt->objectType);
AppendAlterFunctionOwnerStmt(&str, stmt); AppendAlterFunctionOwnerStmt(&str, stmt);
@ -397,10 +408,14 @@ AppendAlterFunctionOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
{ {
appendStringInfoString(buf, "ALTER FUNCTION "); appendStringInfoString(buf, "ALTER FUNCTION ");
} }
else else if (stmt->objectType == OBJECT_PROCEDURE)
{ {
appendStringInfoString(buf, "ALTER PROCEDURE "); appendStringInfoString(buf, "ALTER PROCEDURE ");
} }
else
{
appendStringInfoString(buf, "ALTER AGGREGATE ");
}
AppendFunctionName(buf, func, stmt->objectType); AppendFunctionName(buf, func, stmt->objectType);
appendStringInfo(buf, " OWNER TO %s;", RoleSpecString(stmt->newowner)); appendStringInfo(buf, " OWNER TO %s;", RoleSpecString(stmt->newowner));
@ -416,7 +431,7 @@ DeparseAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt)
StringInfoData str = { 0 }; StringInfoData str = { 0 };
initStringInfo(&str); initStringInfo(&str);
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); AssertObjectTypeIsFunctional(stmt->objectType);
AppendAlterFunctionDependsStmt(&str, stmt); AppendAlterFunctionDependsStmt(&str, stmt);
@ -436,10 +451,14 @@ AppendAlterFunctionDependsStmt(StringInfo buf, AlterObjectDependsStmt *stmt)
{ {
appendStringInfoString(buf, "ALTER FUNCTION "); appendStringInfoString(buf, "ALTER FUNCTION ");
} }
else else if (stmt->objectType == OBJECT_PROCEDURE)
{ {
appendStringInfoString(buf, "ALTER PROCEDURE "); appendStringInfoString(buf, "ALTER PROCEDURE ");
} }
else
{
appendStringInfoString(buf, "ALTER AGGREGATE ");
}
AppendFunctionName(buf, func, stmt->objectType); AppendFunctionName(buf, func, stmt->objectType);
appendStringInfo(buf, " DEPENDS ON EXTENSION %s;", strVal(stmt->extname)); appendStringInfo(buf, " DEPENDS ON EXTENSION %s;", strVal(stmt->extname));
@ -455,7 +474,7 @@ DeparseDropFunctionStmt(DropStmt *stmt)
StringInfoData str = { 0 }; StringInfoData str = { 0 };
initStringInfo(&str); initStringInfo(&str);
Assert(stmt->removeType == OBJECT_FUNCTION || stmt->removeType == OBJECT_PROCEDURE); AssertObjectTypeIsFunctional(stmt->removeType);
AppendDropFunctionStmt(&str, stmt); AppendDropFunctionStmt(&str, stmt);
@ -473,10 +492,14 @@ AppendDropFunctionStmt(StringInfo buf, DropStmt *stmt)
{ {
appendStringInfoString(buf, "DROP FUNCTION "); appendStringInfoString(buf, "DROP FUNCTION ");
} }
else else if (stmt->removeType == OBJECT_PROCEDURE)
{ {
appendStringInfoString(buf, "DROP PROCEDURE "); appendStringInfoString(buf, "DROP PROCEDURE ");
} }
else
{
appendStringInfoString(buf, "DROP AGGREGATE ");
}
if (stmt->missing_ok) if (stmt->missing_ok)
{ {

View File

@ -144,6 +144,7 @@ RenameStmtObjectAddress(RenameStmt *stmt, bool missing_ok)
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
return RenameFunctionStmtObjectAddress(stmt, missing_ok); return RenameFunctionStmtObjectAddress(stmt, missing_ok);
@ -169,6 +170,7 @@ AlterObjectSchemaStmtObjectAddress(AlterObjectSchemaStmt *stmt, bool missing_ok)
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
return AlterFunctionSchemaStmtObjectAddress(stmt, missing_ok); return AlterFunctionSchemaStmtObjectAddress(stmt, missing_ok);
@ -215,6 +217,7 @@ AlterOwnerStmtObjectAddress(AlterOwnerStmt *stmt, bool missing_ok)
} }
case OBJECT_PROCEDURE: case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION: case OBJECT_FUNCTION:
{ {
return AlterFunctionOwnerObjectAddress(stmt, missing_ok); return AlterFunctionOwnerObjectAddress(stmt, missing_ok);

View File

@ -28,13 +28,22 @@
#include "utils/syscache.h" #include "utils/syscache.h"
/* forward declaration for qualify functions */ /* forward declaration for qualify functions */
void QualifyFunction(ObjectWithArgs *func, ObjectType type); static void QualifyFunction(ObjectWithArgs *func, ObjectType type);
void QualifyFunctionSchemaName(ObjectWithArgs *func, ObjectType type); static void QualifyFunctionSchemaName(ObjectWithArgs *func, ObjectType type);
/* AssertObjectTypeIsFunctionType asserts we aren't receiving something we shouldn't */
void
AssertObjectTypeIsFunctional(ObjectType type)
{
Assert(type == OBJECT_AGGREGATE || type == OBJECT_FUNCTION || type ==
OBJECT_PROCEDURE);
}
/* /*
* QualifyAlterFunctionStmt transforms a * QualifyAlterFunctionStmt transforms a
* ALTER {FUNCTION|PROCEDURE} .. * ALTER {AGGREGATE|FUNCTION|PROCEDURE} ..
* statement in place and makes all (supported) statements fully qualified. * statement in place and makes all (supported) statements fully qualified.
* *
* Note that not all queries of this form are valid AlterFunctionStmt * Note that not all queries of this form are valid AlterFunctionStmt
@ -43,19 +52,21 @@ void QualifyFunctionSchemaName(ObjectWithArgs *func, ObjectType type);
void void
QualifyAlterFunctionStmt(AlterFunctionStmt *stmt) QualifyAlterFunctionStmt(AlterFunctionStmt *stmt)
{ {
AssertObjectTypeIsFunctional(stmt->objtype);
QualifyFunction(stmt->func, stmt->objtype); QualifyFunction(stmt->func, stmt->objtype);
} }
/* /*
* QualifyRenameFunctionStmt transforms a * QualifyRenameFunctionStmt transforms a
* ALTER {FUNCTION|PROCEDURE} .. RENAME TO .. * ALTER {AGGREGATE|FUNCTION|PROCEDURE} .. RENAME TO ..
* statement in place and makes the function name fully qualified. * statement in place and makes the function name fully qualified.
*/ */
void void
QualifyRenameFunctionStmt(RenameStmt *stmt) QualifyRenameFunctionStmt(RenameStmt *stmt)
{ {
Assert(stmt->renameType == OBJECT_FUNCTION || stmt->renameType == OBJECT_PROCEDURE); AssertObjectTypeIsFunctional(stmt->renameType);
QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->renameType); QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->renameType);
} }
@ -63,13 +74,13 @@ QualifyRenameFunctionStmt(RenameStmt *stmt)
/* /*
* QualifyAlterFunctionSchemaStmt transforms a * QualifyAlterFunctionSchemaStmt transforms a
* ALTER {FUNCTION|PROCEDURE} .. SET SCHEMA .. * ALTER {AGGREGATE|FUNCTION|PROCEDURE} .. SET SCHEMA ..
* statement in place and makes the function name fully qualified. * statement in place and makes the function name fully qualified.
*/ */
void void
QualifyAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt) QualifyAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt)
{ {
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); AssertObjectTypeIsFunctional(stmt->objectType);
QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType); QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType);
} }
@ -77,13 +88,13 @@ QualifyAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt)
/* /*
* QualifyAlterFunctionOwnerStmt transforms a * QualifyAlterFunctionOwnerStmt transforms a
* ALTER {FUNCTION|PROCEDURE} .. OWNER TO .. * ALTER {AGGREGATE|FUNCTION|PROCEDURE} .. OWNER TO ..
* statement in place and makes the function name fully qualified. * statement in place and makes the function name fully qualified.
*/ */
void void
QualifyAlterFunctionOwnerStmt(AlterOwnerStmt *stmt) QualifyAlterFunctionOwnerStmt(AlterOwnerStmt *stmt)
{ {
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); AssertObjectTypeIsFunctional(stmt->objectType);
QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType); QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType);
} }
@ -91,13 +102,13 @@ QualifyAlterFunctionOwnerStmt(AlterOwnerStmt *stmt)
/* /*
* QualifyAlterFunctionDependsStmt transforms a * QualifyAlterFunctionDependsStmt transforms a
* ALTER {FUNCTION|PROCEDURE} .. DEPENDS ON EXTENSIOIN .. * ALTER {FUNCTION|PROCEDURE} .. DEPENDS ON EXTENSION ..
* statement in place and makes the function name fully qualified. * statement in place and makes the function name fully qualified.
*/ */
void void
QualifyAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt) QualifyAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt)
{ {
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE); AssertObjectTypeIsFunctional(stmt->objectType);
QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType); QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType);
} }

View File

@ -434,7 +434,6 @@ static char *generate_function_name(Oid funcid, int nargs,
List *argnames, Oid *argtypes, List *argnames, Oid *argtypes,
bool has_variadic, bool *use_variadic_p, bool has_variadic, bool *use_variadic_p,
ParseExprKind special_exprkind); ParseExprKind special_exprkind);
static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2);
#define only_marker(rte) ((rte)->inh ? "" : "ONLY ") #define only_marker(rte) ((rte)->inh ? "" : "ONLY ")
@ -7878,7 +7877,7 @@ generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes,
* plus the OPERATOR() decoration needed to use a qualified operator name * plus the OPERATOR() decoration needed to use a qualified operator name
* in an expression. * in an expression.
*/ */
static char * char *
generate_operator_name(Oid operid, Oid arg1, Oid arg2) generate_operator_name(Oid operid, Oid arg1, Oid arg2)
{ {
StringInfoData buf; StringInfoData buf;

View File

@ -435,7 +435,6 @@ static char *generate_function_name(Oid funcid, int nargs,
List *argnames, Oid *argtypes, List *argnames, Oid *argtypes,
bool has_variadic, bool *use_variadic_p, bool has_variadic, bool *use_variadic_p,
ParseExprKind special_exprkind); ParseExprKind special_exprkind);
static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2);
#define only_marker(rte) ((rte)->inh ? "" : "ONLY ") #define only_marker(rte) ((rte)->inh ? "" : "ONLY ")
@ -7879,7 +7878,7 @@ generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes,
* plus the OPERATOR() decoration needed to use a qualified operator name * plus the OPERATOR() decoration needed to use a qualified operator name
* in an expression. * in an expression.
*/ */
static char * char *
generate_operator_name(Oid operid, Oid arg1, Oid arg2) generate_operator_name(Oid operid, Oid arg1, Oid arg2)
{ {
StringInfoData buf; StringInfoData buf;

View File

@ -50,6 +50,7 @@ extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid,
StringInfo buffer); StringInfo buffer);
extern char * generate_relation_name(Oid relid, List *namespaces); extern char * generate_relation_name(Oid relid, List *namespaces);
extern char * generate_qualified_relation_name(Oid relid); extern char * generate_qualified_relation_name(Oid relid);
extern char * generate_operator_name(Oid operid, Oid arg1, Oid arg2);
#endif /* CITUS_RULEUTILS_H */ #endif /* CITUS_RULEUTILS_H */

View File

@ -27,6 +27,8 @@ extern char * FormatCollateBE(Oid collate_oid);
extern char * FormatCollateBEQualified(Oid collate_oid); extern char * FormatCollateBEQualified(Oid collate_oid);
extern char * FormatCollateExtended(Oid collid, bits16 flags); extern char * FormatCollateExtended(Oid collid, bits16 flags);
extern void AssertObjectTypeIsFunctional(ObjectType type);
extern void QualifyTreeNode(Node *stmt); extern void QualifyTreeNode(Node *stmt);
extern const char * DeparseTreeNode(Node *stmt); extern const char * DeparseTreeNode(Node *stmt);

View File

@ -37,8 +37,10 @@ CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric
LANGUAGE SQL LANGUAGE SQL
IMMUTABLE IMMUTABLE
RETURNS NULL ON NULL INPUT; RETURNS NULL ON NULL INPUT;
CREATE FUNCTION add_text(text, text) RETURNS int -- $function$ is what postgres escapes functions with when deparsing
AS 'select $1::int + $2::int;' -- make sure $function$ doesn't cause invalid syntax
CREATE FUNCTION add_text(text, text) RETURNS text
AS 'select $function$test$function$ || $1::int || $2::int;'
LANGUAGE SQL LANGUAGE SQL
IMMUTABLE IMMUTABLE
RETURNS NULL ON NULL INPUT; RETURNS NULL ON NULL INPUT;
@ -70,6 +72,41 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
LANGUAGE SQL LANGUAGE SQL
IMMUTABLE IMMUTABLE
RETURNS NULL ON NULL INPUT; RETURNS NULL ON NULL INPUT;
-- Include aggregate function case
CREATE FUNCTION agg_sfunc(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state + item;
end;
$$;
CREATE FUNCTION agg_invfunc(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state - item;
end;
$$;
CREATE FUNCTION agg_finalfunc(state int, extra int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state * 2;
end;
$$;
CREATE AGGREGATE sum2(int) (
sfunc = agg_sfunc,
stype = int,
sspace = 8,
finalfunc = agg_finalfunc,
finalfunc_extra,
initcond = '5',
msfunc = agg_sfunc,
mstype = int,
msspace = 12,
minvfunc = agg_invfunc,
mfinalfunc = agg_finalfunc,
mfinalfunc_extra,
minitcond = '1',
sortop = ">"
);
-- make sure to propagate ddl propagation after we have setup our functions, this will -- make sure to propagate ddl propagation after we have setup our functions, this will
-- allow alter statements to be propagated and keep the functions in sync across machines -- allow alter statements to be propagated and keep the functions in sync across machines
SET citus.enable_ddl_propagation TO on; SET citus.enable_ddl_propagation TO on;
@ -152,6 +189,13 @@ SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
t t
(1 row) (1 row)
-- distribute aggregate
SELECT create_distributed_function('sum2(int)');
create_distributed_function
-----------------------------
(1 row)
-- testing alter statements for a distributed function -- testing alter statements for a distributed function
-- ROWS 5, untested because; -- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set -- ERROR: ROWS is not applicable when function does not return a set
@ -256,6 +300,15 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER B
(2 rows) (2 rows)
ALTER FUNCTION add2(int,int) RENAME TO add; ALTER FUNCTION add2(int,int) RENAME TO add;
ALTER AGGREGATE sum2(int) RENAME TO sum27;
SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'sum27';$$) ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
ALTER AGGREGATE sum27(int) RENAME TO sum2;
-- change the owner of the function and verify the owner has been changed on the workers -- change the owner of the function and verify the owner has been changed on the workers
ALTER FUNCTION add(int,int) OWNER TO functionuser; ALTER FUNCTION add(int,int) OWNER TO functionuser;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
@ -264,6 +317,7 @@ SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
t t
(1 row) (1 row)
ALTER AGGREGATE sum2(int) OWNER TO functionuser;
SELECT run_command_on_workers($$ SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname) SELECT row(usename, nspname, proname)
FROM pg_proc FROM pg_proc
@ -277,6 +331,19 @@ $$);
(localhost,57638,t,"(functionuser,function_tests,add)") (localhost,57638,t,"(functionuser,function_tests,add)")
(2 rows) (2 rows)
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
WHERE proname = 'sum2';
$$);
run_command_on_workers
----------------------------------------------------------
(localhost,57637,t,"(functionuser,function_tests,sum2)")
(localhost,57638,t,"(functionuser,function_tests,sum2)")
(2 rows)
-- change the schema of the function and verify the old schema doesn't exist anymore while -- change the schema of the function and verify the old schema doesn't exist anymore while
-- the new schema has the function. -- the new schema has the function.
ALTER FUNCTION add(int,int) SET SCHEMA function_tests2; ALTER FUNCTION add(int,int) SET SCHEMA function_tests2;
@ -301,6 +368,7 @@ SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER B
(2 rows) (2 rows)
ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2;
-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator -- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator
CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
@ -337,6 +405,15 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY
localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist
(2 rows) (2 rows)
DROP AGGREGATE function_tests2.sum2(int);
-- call should fail as aggregate should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests2.sum2(id) FROM (select 1 id, 2) subq;') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+---------------------------------------------------------------
localhost | 57637 | f | ERROR: function function_tests2.sum2(integer) does not exist
localhost | 57638 | f | ERROR: function function_tests2.sum2(integer) does not exist
(2 rows)
-- postgres doesn't accept parameter names in the regprocedure input -- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
ERROR: syntax error at or near "int" ERROR: syntax error at or near "int"
@ -517,7 +594,7 @@ WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass
(1 row) (1 row)
-- now, re-distributed with the default colocation option, we should still see that the same colocation -- now, re-distributed with the default colocation option, we should still see that the same colocation
-- group preserved, because we're using the default shard creationg settings -- group preserved, because we're using the default shard creation settings
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
create_distributed_function create_distributed_function
----------------------------- -----------------------------

View File

@ -32,8 +32,10 @@ CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric
IMMUTABLE IMMUTABLE
RETURNS NULL ON NULL INPUT; RETURNS NULL ON NULL INPUT;
CREATE FUNCTION add_text(text, text) RETURNS int -- $function$ is what postgres escapes functions with when deparsing
AS 'select $1::int + $2::int;' -- make sure $function$ doesn't cause invalid syntax
CREATE FUNCTION add_text(text, text) RETURNS text
AS 'select $function$test$function$ || $1::int || $2::int;'
LANGUAGE SQL LANGUAGE SQL
IMMUTABLE IMMUTABLE
RETURNS NULL ON NULL INPUT; RETURNS NULL ON NULL INPUT;
@ -73,6 +75,45 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
IMMUTABLE IMMUTABLE
RETURNS NULL ON NULL INPUT; RETURNS NULL ON NULL INPUT;
-- Include aggregate function case
CREATE FUNCTION agg_sfunc(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state + item;
end;
$$;
CREATE FUNCTION agg_invfunc(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state - item;
end;
$$;
CREATE FUNCTION agg_finalfunc(state int, extra int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state * 2;
end;
$$;
CREATE AGGREGATE sum2(int) (
sfunc = agg_sfunc,
stype = int,
sspace = 8,
finalfunc = agg_finalfunc,
finalfunc_extra,
initcond = '5',
msfunc = agg_sfunc,
mstype = int,
msspace = 12,
minvfunc = agg_invfunc,
mfinalfunc = agg_finalfunc,
mfinalfunc_extra,
minitcond = '1',
sortop = ">"
);
-- make sure to propagate ddl propagation after we have setup our functions, this will -- make sure to propagate ddl propagation after we have setup our functions, this will
-- allow alter statements to be propagated and keep the functions in sync across machines -- allow alter statements to be propagated and keep the functions in sync across machines
SET citus.enable_ddl_propagation TO on; SET citus.enable_ddl_propagation TO on;
@ -105,6 +146,9 @@ SELECT create_distributed_function('add(int,int)', '$1');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
-- distribute aggregate
SELECT create_distributed_function('sum2(int)');
-- testing alter statements for a distributed function -- testing alter statements for a distributed function
-- ROWS 5, untested because; -- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set -- ERROR: ROWS is not applicable when function does not return a set
@ -140,9 +184,14 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY
SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2; SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2;
ALTER FUNCTION add2(int,int) RENAME TO add; ALTER FUNCTION add2(int,int) RENAME TO add;
ALTER AGGREGATE sum2(int) RENAME TO sum27;
SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'sum27';$$) ORDER BY 1,2;
ALTER AGGREGATE sum27(int) RENAME TO sum2;
-- change the owner of the function and verify the owner has been changed on the workers -- change the owner of the function and verify the owner has been changed on the workers
ALTER FUNCTION add(int,int) OWNER TO functionuser; ALTER FUNCTION add(int,int) OWNER TO functionuser;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER AGGREGATE sum2(int) OWNER TO functionuser;
SELECT run_command_on_workers($$ SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname) SELECT row(usename, nspname, proname)
FROM pg_proc FROM pg_proc
@ -150,6 +199,13 @@ JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace) JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
WHERE proname = 'add'; WHERE proname = 'add';
$$); $$);
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
WHERE proname = 'sum2';
$$);
-- change the schema of the function and verify the old schema doesn't exist anymore while -- change the schema of the function and verify the old schema doesn't exist anymore while
-- the new schema has the function. -- the new schema has the function.
@ -159,6 +215,8 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY
SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2; SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2;
ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2;
-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator -- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator
CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
@ -177,6 +235,10 @@ DROP FUNCTION add(int,int);
-- call should fail as function should have been dropped -- call should fail as function should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
DROP AGGREGATE function_tests2.sum2(int);
-- call should fail as aggregate should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests2.sum2(id) FROM (select 1 id, 2) subq;') ORDER BY 1,2;
-- postgres doesn't accept parameter names in the regprocedure input -- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
@ -270,7 +332,7 @@ WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass
objects.objid = 'add_with_param_names(int, int)'::regprocedure; objects.objid = 'add_with_param_names(int, int)'::regprocedure;
-- now, re-distributed with the default colocation option, we should still see that the same colocation -- now, re-distributed with the default colocation option, we should still see that the same colocation
-- group preserved, because we're using the default shard creationg settings -- group preserved, because we're using the default shard creation settings
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects FROM pg_dist_partition, citus.pg_dist_object as objects