create_distributed_function: accept aggregates

Adds support for OCLASS_PROC to worker_create_or_replace_object
pull/3159/head
Philip Dubé 2019-09-27 18:26:07 +00:00 committed by Philip Dubé
parent 622ee54c95
commit 2fc45e5897
23 changed files with 1343 additions and 258 deletions

View File

@ -144,24 +144,6 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
{
switch (getObjectClass(dependency))
{
case OCLASS_SCHEMA:
{
const char *schemaDDLCommand = CreateSchemaDDLCommand(dependency->objectId);
if (schemaDDLCommand == NULL)
{
/* no schema to create */
return NIL;
}
return list_make1((void *) schemaDDLCommand);
}
case OCLASS_TYPE:
{
return CreateTypeDDLCommandsIdempotent(dependency);
}
case OCLASS_CLASS:
{
/*
@ -182,6 +164,24 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
return CreateFunctionDDLCommandsIdempotent(dependency);
}
case OCLASS_SCHEMA:
{
const char *schemaDDLCommand = CreateSchemaDDLCommand(dependency->objectId);
if (schemaDDLCommand == NULL)
{
/* no schema to create */
return NIL;
}
return list_make1((void *) schemaDDLCommand);
}
case OCLASS_TYPE:
{
return CreateTypeDDLCommandsIdempotent(dependency);
}
default:
{
break;

View File

@ -24,10 +24,12 @@
#endif
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_aggregate.h"
#include "catalog/namespace.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
@ -40,7 +42,9 @@
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/worker_create_or_replace.h"
#include "distributed/worker_transaction.h"
#include "nodes/makefuncs.h"
#include "parser/parse_coerce.h"
#include "parser/parse_type.h"
#include "storage/lmgr.h"
@ -49,12 +53,13 @@
#include "utils/fmgrprotos.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/regproc.h"
#define argumentStartsWith(arg, prefix) \
(strncmp(arg, prefix, strlen(prefix)) == 0)
/* forward declaration for helper functions*/
static char * GetFunctionDDLCommand(const RegProcedure funcOid);
static char * GetAggregateDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace);
static char * GetFunctionAlterOwnerCommand(const RegProcedure funcOid);
static int GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName,
Oid *distributionArgumentOid);
@ -75,13 +80,11 @@ static ObjectAddress * FunctionToObjectAddress(ObjectType objectType,
bool missing_ok);
static void ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt);
static void ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress);
static char * quote_qualified_func_name(Oid funcOid);
PG_FUNCTION_INFO_V1(create_distributed_function);
#define AssertIsFunctionOrProcedure(objtype) \
Assert((objtype) == OBJECT_FUNCTION || (objtype) == OBJECT_PROCEDURE)
/*
* create_distributed_function gets a function or procedure name with their list of
@ -95,7 +98,9 @@ create_distributed_function(PG_FUNCTION_ARGS)
text *distributionArgumentNameText = NULL; /* optional */
text *colocateWithText = NULL; /* optional */
const char *ddlCommand = NULL;
StringInfoData ddlCommand = { 0 };
const char *createFunctionSQL = NULL;
const char *alterFunctionOwnerSQL = NULL;
ObjectAddress functionAddress = { 0 };
int distributionArgumentIndex = -1;
@ -154,9 +159,11 @@ create_distributed_function(PG_FUNCTION_ARGS)
EnsureDependenciesExistsOnAllNodes(&functionAddress);
ddlCommand = GetFunctionDDLCommand(funcOid);
SendCommandToWorkersAsUser(ALL_WORKERS, CurrentUserName(), ddlCommand);
createFunctionSQL = GetFunctionDDLCommand(funcOid, true);
alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid);
initStringInfo(&ddlCommand);
appendStringInfo(&ddlCommand, "%s;%s", createFunctionSQL, alterFunctionOwnerSQL);
SendCommandToWorkersAsUser(ALL_WORKERS, CurrentUserName(), ddlCommand.data);
MarkObjectDistributed(&functionAddress);
@ -215,11 +222,14 @@ List *
CreateFunctionDDLCommandsIdempotent(const ObjectAddress *functionAddress)
{
char *ddlCommand = NULL;
char *alterFunctionOwnerSQL = NULL;
Assert(functionAddress->classId == ProcedureRelationId);
ddlCommand = GetFunctionDDLCommand(functionAddress->objectId);
return list_make1(ddlCommand);
ddlCommand = GetFunctionDDLCommand(functionAddress->objectId, true);
alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(functionAddress->objectId);
return list_make2(ddlCommand, alterFunctionOwnerSQL);
}
@ -548,39 +558,42 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
/*
* GetFunctionDDLCommand returns the complete "CREATE OR REPLACE FUNCTION ..." statement for
* the specified function followed by "ALTER FUNCTION .. SET OWNER ..".
*
* useCreateOrReplace is ignored for non-aggregate functions.
*/
static char *
GetFunctionDDLCommand(const RegProcedure funcOid)
char *
GetFunctionDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace)
{
StringInfo ddlCommand = makeStringInfo();
OverrideSearchPath *overridePath = NULL;
Datum sqlTextDatum = 0;
char *createFunctionSQL = NULL;
char *alterFunctionOwnerSQL = NULL;
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
*/
overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
if (get_func_prokind(funcOid) == PROKIND_AGGREGATE)
{
createFunctionSQL = GetAggregateDDLCommand(funcOid, useCreateOrReplace);
}
else
{
Datum sqlTextDatum = (Datum) 0;
sqlTextDatum = DirectFunctionCall1(pg_get_functiondef,
ObjectIdGetDatum(funcOid));
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
*/
overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
/* revert back to original search_path */
PopOverrideSearchPath();
PushOverrideSearchPath(overridePath);
sqlTextDatum = DirectFunctionCall1(pg_get_functiondef,
ObjectIdGetDatum(funcOid));
createFunctionSQL = TextDatumGetCString(sqlTextDatum);
createFunctionSQL = TextDatumGetCString(sqlTextDatum);
alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid);
/* revert back to original search_path */
PopOverrideSearchPath();
}
appendStringInfo(ddlCommand, "%s;%s", createFunctionSQL, alterFunctionOwnerSQL);
return ddlCommand->data;
return createFunctionSQL;
}
@ -593,7 +606,7 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid)
{
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcOid));
StringInfo alterCommand = makeStringInfo();
bool isProcedure = false;
char *kindString = "FUNCTION";
Oid procOwner = InvalidOid;
char *functionSignature = NULL;
@ -610,7 +623,14 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid)
procOwner = procform->proowner;
isProcedure = procform->prokind == PROKIND_PROCEDURE;
if (procform->prokind == PROKIND_PROCEDURE)
{
kindString = "PROCEDURE";
}
else if (procform->prokind == PROKIND_AGGREGATE)
{
kindString = "AGGREGATE";
}
ReleaseSysCache(proctup);
}
@ -646,7 +666,7 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid)
functionOwner = GetUserNameFromId(procOwner, false);
appendStringInfo(alterCommand, "ALTER %s %s OWNER TO %s;",
(isProcedure ? "PROCEDURE" : "FUNCTION"),
kindString,
functionSignature,
quote_identifier(functionOwner));
@ -654,6 +674,362 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid)
}
/*
* GetAggregateDDLCommand returns a string for creating an aggregate.
* CREATE OR REPLACE AGGREGATE was only introduced in pg12,
* so a second parameter useCreateOrReplace signals whether to
* to create a plain CREATE AGGREGATE or not. In pg11 we return a string
* which is a call to worker_create_or_replace_object in lieu of
* CREATE OR REPLACE AGGREGATE.
*/
static char *
GetAggregateDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace)
{
StringInfoData buf = { 0 };
HeapTuple proctup = NULL;
Form_pg_proc proc = NULL;
HeapTuple aggtup = NULL;
Form_pg_aggregate agg = NULL;
const char *name = NULL;
const char *nsp = NULL;
int numargs = 0;
int i = 0;
Oid *argtypes = NULL;
char **argnames = NULL;
char *argmodes = NULL;
int insertorderbyat = -1;
int argsprinted = 0;
int inputargno = 0;
proctup = SearchSysCache1(PROCOID, funcOid);
if (!HeapTupleIsValid(proctup))
{
elog(ERROR, "cache lookup failed for %d", funcOid);
}
proc = (Form_pg_proc) GETSTRUCT(proctup);
Assert(proc->prokind == PROKIND_AGGREGATE);
initStringInfo(&buf);
name = NameStr(proc->proname);
nsp = get_namespace_name(proc->pronamespace);
#if PG_VERSION_NUM >= 120000
if (useCreateOrReplace)
{
appendStringInfo(&buf, "CREATE OR REPLACE AGGREGATE %s(",
quote_qualified_identifier(nsp, name));
}
else
{
appendStringInfo(&buf, "CREATE AGGREGATE %s(",
quote_qualified_identifier(nsp, name));
}
#else
appendStringInfo(&buf, "CREATE AGGREGATE %s(",
quote_qualified_identifier(nsp, name));
#endif
/* Parameters, borrows heavily from print_function_arguments in postgres */
numargs = get_func_arg_info(proctup, &argtypes, &argnames, &argmodes);
aggtup = SearchSysCache1(AGGFNOID, funcOid);
if (!HeapTupleIsValid(aggtup))
{
elog(ERROR, "cache lookup failed for %d", funcOid);
}
agg = (Form_pg_aggregate) GETSTRUCT(aggtup);
if (AGGKIND_IS_ORDERED_SET(agg->aggkind))
{
insertorderbyat = agg->aggnumdirectargs;
}
for (i = 0; i < numargs; i++)
{
Oid argtype = argtypes[i];
char *argname = argnames ? argnames[i] : NULL;
char argmode = argmodes ? argmodes[i] : PROARGMODE_IN;
const char *modename;
switch (argmode)
{
case PROARGMODE_IN:
{
modename = "";
break;
}
case PROARGMODE_VARIADIC:
{
modename = "VARIADIC ";
break;
}
default:
{
elog(ERROR, "unexpected parameter mode '%c'", argmode);
modename = NULL;
break;
}
}
inputargno++; /* this is a 1-based counter */
if (argsprinted == insertorderbyat)
{
appendStringInfoString(&buf, " ORDER BY ");
}
else if (argsprinted)
{
appendStringInfoString(&buf, ", ");
}
appendStringInfoString(&buf, modename);
if (argname && argname[0])
{
appendStringInfo(&buf, "%s ", quote_identifier(argname));
}
appendStringInfoString(&buf, format_type_be_qualified(argtype));
argsprinted++;
/* nasty hack: print the last arg twice for variadic ordered-set agg */
if (argsprinted == insertorderbyat && i == numargs - 1)
{
i--;
}
}
appendStringInfo(&buf, ") (STYPE = %s,SFUNC = %s",
format_type_be_qualified(agg->aggtranstype),
quote_qualified_func_name(agg->aggtransfn));
if (agg->aggtransspace != 0)
{
appendStringInfo(&buf, ", SSPACE = %d", agg->aggtransspace);
}
if (agg->aggfinalfn != InvalidOid)
{
const char *finalmodifystring = NULL;
switch (agg->aggfinalmodify)
{
case AGGMODIFY_READ_ONLY:
{
finalmodifystring = "READ_ONLY";
break;
}
case AGGMODIFY_SHAREABLE:
{
finalmodifystring = "SHAREABLE";
break;
}
case AGGMODIFY_READ_WRITE:
{
finalmodifystring = "READ_WRITE";
break;
}
}
appendStringInfo(&buf, ", FINALFUNC = %s",
quote_qualified_func_name(agg->aggfinalfn));
if (finalmodifystring != NULL)
{
appendStringInfo(&buf, ", FINALFUNC_MODIFY = %s", finalmodifystring);
}
if (agg->aggfinalextra)
{
appendStringInfoString(&buf, ", FINALFUNC_EXTRA");
}
}
if (agg->aggmtransspace != 0)
{
appendStringInfo(&buf, ", MSSPACE = %d", agg->aggmtransspace);
}
if (agg->aggmfinalfn)
{
const char *mfinalmodifystring = NULL;
switch (agg->aggfinalmodify)
{
case AGGMODIFY_READ_ONLY:
{
mfinalmodifystring = "READ_ONLY";
break;
}
case AGGMODIFY_SHAREABLE:
{
mfinalmodifystring = "SHAREABLE";
break;
}
case AGGMODIFY_READ_WRITE:
{
mfinalmodifystring = "READ_WRITE";
break;
}
}
appendStringInfo(&buf, ", MFINALFUNC = %s",
quote_qualified_func_name(agg->aggmfinalfn));
if (mfinalmodifystring != NULL)
{
appendStringInfo(&buf, ", MFINALFUNC_MODIFY = %s", mfinalmodifystring);
}
if (agg->aggmfinalextra)
{
appendStringInfoString(&buf, ", MFINALFUNC_EXTRA");
}
}
if (agg->aggmtransfn)
{
appendStringInfo(&buf, ", MSFUNC = %s",
quote_qualified_func_name(agg->aggmtransfn));
if (agg->aggmtranstype)
{
appendStringInfo(&buf, ", MSTYPE = %s",
format_type_be_qualified(agg->aggmtranstype));
}
}
if (agg->aggtransspace != 0)
{
appendStringInfo(&buf, ", SSPACE = %d", agg->aggtransspace);
}
if (agg->aggminvtransfn)
{
appendStringInfo(&buf, ", MINVFUNC = %s",
quote_qualified_func_name(agg->aggminvtransfn));
}
if (agg->aggcombinefn)
{
appendStringInfo(&buf, ", COMBINEFUNC = %s",
quote_qualified_func_name(agg->aggcombinefn));
}
if (agg->aggserialfn)
{
appendStringInfo(&buf, ", SERIALFUNC = %s",
quote_qualified_func_name(agg->aggserialfn));
}
if (agg->aggdeserialfn)
{
appendStringInfo(&buf, ", DESERIALFUNC = %s",
quote_qualified_func_name(agg->aggdeserialfn));
}
if (agg->aggsortop != InvalidOid)
{
appendStringInfo(&buf, ", SORTOP = %s",
generate_operator_name(agg->aggsortop, argtypes[0],
argtypes[0]));
}
{
const char *parallelstring = NULL;
switch (proc->proparallel)
{
case PROPARALLEL_SAFE:
{
parallelstring = "SAFE";
break;
}
case PROPARALLEL_RESTRICTED:
{
parallelstring = "RESTRICTED";
break;
}
case PROPARALLEL_UNSAFE:
{
break;
}
default:
{
elog(WARNING, "Unknown parallel option, ignoring: %c", proc->proparallel);
break;
}
}
if (parallelstring != NULL)
{
appendStringInfo(&buf, ", PARALLEL = %s", parallelstring);
}
}
{
bool isNull = false;
Datum textInitVal = SysCacheGetAttr(AGGFNOID, aggtup,
Anum_pg_aggregate_agginitval,
&isNull);
if (!isNull)
{
char *strInitVal = TextDatumGetCString(textInitVal);
char *strInitValQuoted = quote_literal_cstr(strInitVal);
appendStringInfo(&buf, ", INITCOND = %s", strInitValQuoted);
pfree(strInitValQuoted);
pfree(strInitVal);
}
}
{
bool isNull = false;
Datum textInitVal = SysCacheGetAttr(AGGFNOID, aggtup,
Anum_pg_aggregate_aggminitval,
&isNull);
if (!isNull)
{
char *strInitVal = TextDatumGetCString(textInitVal);
char *strInitValQuoted = quote_literal_cstr(strInitVal);
appendStringInfo(&buf, ", MINITCOND = %s", strInitValQuoted);
pfree(strInitValQuoted);
pfree(strInitVal);
}
}
if (agg->aggkind == AGGKIND_HYPOTHETICAL)
{
appendStringInfoString(&buf, ", HYPOTHETICAL");
}
appendStringInfoChar(&buf, ')');
ReleaseSysCache(aggtup);
ReleaseSysCache(proctup);
#if PG_VERSION_NUM < 120000
if (useCreateOrReplace)
{
return WrapCreateOrReplace(buf.data);
}
#endif
return buf.data;
}
/*
* EnsureSequentialModeForFunctionDDL makes sure that the current transaction is already in
* sequential mode, or can still safely be put in sequential mode, it errors if that is
@ -856,7 +1232,6 @@ List *
ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString)
{
const ObjectAddress *address = NULL;
const char *sql = NULL;
List *commands = NIL;
if (!ShouldPropagateCreateFunction(stmt))
@ -867,10 +1242,9 @@ ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString)
address = GetObjectAddressFromParseTree((Node *) stmt, false);
EnsureDependenciesExistsOnAllNodes(address);
sql = GetFunctionDDLCommand(address->objectId);
commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
commands = list_make4(DISABLE_DDL_PROPAGATION,
GetFunctionDDLCommand(address->objectId, true),
GetFunctionAlterOwnerCommand(address->objectId),
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(ALL_WORKERS, commands);
@ -907,6 +1281,27 @@ CreateFunctionStmtObjectAddress(CreateFunctionStmt *stmt, bool missing_ok)
}
const ObjectAddress *
DefineAggregateStmtObjectAddress(DefineStmt *stmt, bool missing_ok)
{
ObjectWithArgs *objectWithArgs = NULL;
ListCell *parameterCell = NULL;
Assert(stmt->kind == OBJECT_AGGREGATE);
objectWithArgs = makeNode(ObjectWithArgs);
objectWithArgs->objname = stmt->defnames;
foreach(parameterCell, linitial(stmt->args))
{
FunctionParameter *funcParam = castNode(FunctionParameter, lfirst(parameterCell));
objectWithArgs->objargs = lappend(objectWithArgs->objargs, funcParam->argType);
}
return FunctionToObjectAddress(OBJECT_AGGREGATE, objectWithArgs, missing_ok);
}
/*
* PlanAlterFunctionStmt is invoked for alter function statements with actions. Here we
* plan the jobs to be executed on the workers for functions that have been distributed in
@ -919,7 +1314,7 @@ PlanAlterFunctionStmt(AlterFunctionStmt *stmt, const char *queryString)
const ObjectAddress *address = NULL;
List *commands = NIL;
AssertIsFunctionOrProcedure(stmt->objtype);
AssertObjectTypeIsFunctional(stmt->objtype);
address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(address))
@ -956,7 +1351,7 @@ PlanRenameFunctionStmt(RenameStmt *stmt, const char *queryString)
const ObjectAddress *address = NULL;
List *commands = NIL;
AssertIsFunctionOrProcedure(stmt->renameType);
AssertObjectTypeIsFunctional(stmt->renameType);
address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(address))
@ -990,7 +1385,7 @@ PlanAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString
const ObjectAddress *address = NULL;
List *commands = NIL;
AssertIsFunctionOrProcedure(stmt->objectType);
AssertObjectTypeIsFunctional(stmt->objectType);
address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(address))
@ -1025,7 +1420,7 @@ PlanAlterFunctionOwnerStmt(AlterOwnerStmt *stmt, const char *queryString)
const char *sql = NULL;
List *commands = NULL;
AssertIsFunctionOrProcedure(stmt->objectType);
AssertObjectTypeIsFunctional(stmt->objectType);
address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(address))
@ -1067,7 +1462,7 @@ PlanDropFunctionStmt(DropStmt *stmt, const char *queryString)
ListCell *objectWithArgsListCell = NULL;
DropStmt *stmtCopy = NULL;
AssertIsFunctionOrProcedure(stmt->removeType);
AssertObjectTypeIsFunctional(stmt->removeType);
if (creating_extension)
{
@ -1169,7 +1564,7 @@ PlanAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt, const char *queryStri
const ObjectAddress *address = NULL;
const char *functionName = NULL;
AssertIsFunctionOrProcedure(stmt->objectType);
AssertObjectTypeIsFunctional(stmt->objectType);
if (creating_extension)
{
@ -1217,7 +1612,7 @@ PlanAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt, const char *queryStri
const ObjectAddress *
AlterFunctionDependsStmtObjectAddress(AlterObjectDependsStmt *stmt, bool missing_ok)
{
AssertIsFunctionOrProcedure(stmt->objectType);
AssertObjectTypeIsFunctional(stmt->objectType);
return FunctionToObjectAddress(stmt->objectType,
castNode(ObjectWithArgs, stmt->object), missing_ok);
@ -1234,7 +1629,7 @@ ProcessAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryStr
{
const ObjectAddress *address = NULL;
AssertIsFunctionOrProcedure(stmt->objectType);
AssertObjectTypeIsFunctional(stmt->objectType);
address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(address))
@ -1300,7 +1695,7 @@ AlterFunctionSchemaStmtObjectAddress(AlterObjectSchemaStmt *stmt, bool missing_o
List *names = NIL;
ObjectAddress *address = NULL;
AssertIsFunctionOrProcedure(stmt->objectType);
AssertObjectTypeIsFunctional(stmt->objectType);
objectWithArgs = castNode(ObjectWithArgs, stmt->object);
funcOid = LookupFuncWithArgs(stmt->objectType, objectWithArgs, true);
@ -1348,6 +1743,115 @@ AlterFunctionSchemaStmtObjectAddress(AlterObjectSchemaStmt *stmt, bool missing_o
}
/*
* GenerateBackupNameForProcCollision generates a new proc name for an existing proc. The
* name is generated in such a way that the new name doesn't overlap with an existing proc
* by adding a suffix with incrementing number after the new name.
*/
char *
GenerateBackupNameForProcCollision(const ObjectAddress *address)
{
char *newName = palloc0(NAMEDATALEN);
char suffix[NAMEDATALEN] = { 0 };
int count = 0;
Value *namespace = makeString(get_namespace_name(get_func_namespace(
address->objectId)));
char *baseName = get_func_name(address->objectId);
int baseLength = strlen(baseName);
int numargs = 0;
Oid *argtypes = NULL;
char **argnames = NULL;
char *argmodes = NULL;
HeapTuple proctup = SearchSysCache1(PROCOID, address->objectId);
if (!HeapTupleIsValid(proctup))
{
elog(ERROR, "citus cache lookup failed.");
}
numargs = get_func_arg_info(proctup, &argtypes, &argnames, &argmodes);
ReleaseSysCache(proctup);
while (true)
{
int suffixLength = snprintf(suffix, NAMEDATALEN - 1, "(citus_backup_%d)",
count);
List *newProcName = NIL;
FuncCandidateList clist = NULL;
/* trim the base name at the end to leave space for the suffix and trailing \0 */
baseLength = Min(baseLength, NAMEDATALEN - suffixLength - 1);
/* clear newName before copying the potentially trimmed baseName and suffix */
memset(newName, 0, NAMEDATALEN);
strncpy(newName, baseName, baseLength);
strncpy(newName + baseLength, suffix, suffixLength);
newProcName = list_make2(namespace, makeString(newName));
/* don't need to rename if the input arguments don't match */
clist = FuncnameGetCandidates(newProcName, numargs, NIL, false, false, true);
for (; clist; clist = clist->next)
{
if (memcmp(clist->args, argtypes, sizeof(Oid) * numargs) == 0)
{
break;
}
}
if (!clist)
{
return newName;
}
count++;
}
}
/*
* ObjectWithArgsFromOid returns the corresponding ObjectWithArgs node for a given pg_proc oid
*/
ObjectWithArgs *
ObjectWithArgsFromOid(Oid funcOid)
{
ObjectWithArgs *objectWithArgs = makeNode(ObjectWithArgs);
List *objargs = NIL;
Oid *argTypes = NULL;
char **argNames = NULL;
char *argModes = NULL;
int numargs = 0;
int i = 0;
HeapTuple proctup = SearchSysCache1(PROCOID, funcOid);
if (!HeapTupleIsValid(proctup))
{
elog(ERROR, "citus cache lookup failed.");
}
numargs = get_func_arg_info(proctup, &argTypes, &argNames, &argModes);
objectWithArgs->objname = list_make2(
makeString(get_namespace_name(get_func_namespace(funcOid))),
makeString(get_func_name(funcOid))
);
for (i = 0; i < numargs; i++)
{
if (argModes == NULL ||
argModes[i] != PROARGMODE_OUT || argModes[i] != PROARGMODE_TABLE)
{
objargs = lappend(objargs, makeTypeNameFromOid(argTypes[i], -1));
}
}
objectWithArgs->objargs = objargs;
ReleaseSysCache(proctup);
return objectWithArgs;
}
/*
* FunctionToObjectAddress returns the ObjectAddress of a Function or Procedure based on
* its type and ObjectWithArgs describing the Function/Procedure. If missing_ok is set to
@ -1361,7 +1865,7 @@ FunctionToObjectAddress(ObjectType objectType, ObjectWithArgs *objectWithArgs,
Oid funcOid = InvalidOid;
ObjectAddress *address = NULL;
AssertIsFunctionOrProcedure(objectType);
AssertObjectTypeIsFunctional(objectType);
funcOid = LookupFuncWithArgs(objectType, objectWithArgs, missing_ok);
address = palloc0(sizeof(ObjectAddress));
@ -1428,3 +1932,13 @@ ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress)
extensionName)));
}
}
/* returns the quoted qualified name of a given function oid */
static char *
quote_qualified_func_name(Oid funcOid)
{
return quote_qualified_identifier(
get_namespace_name(get_func_namespace(funcOid)),
get_func_name(funcOid));
}

View File

@ -122,6 +122,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString)
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
return PlanAlterFunctionSchemaStmt(stmt, queryString);
@ -197,6 +198,7 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryStrin
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
ProcessAlterFunctionSchemaStmt(stmt, queryString);

View File

@ -60,6 +60,7 @@
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_create_or_replace.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "miscadmin.h"
@ -78,7 +79,6 @@
#define ALTER_TYPE_OWNER_COMMAND "ALTER TYPE %s OWNER TO %s;"
#define CREATE_OR_REPLACE_COMMAND "SELECT worker_create_or_replace_object(%s);"
/* guc to turn of the automatic type distribution */
@ -90,7 +90,6 @@ static List * TypeNameListToObjectAddresses(List *objects);
static TypeName * MakeTypeNameFromRangeVar(const RangeVar *relation);
static void EnsureSequentialModeForTypeDDL(void);
static Oid GetTypeOwner(Oid typeOid);
static const char * WrapCreateOrReplace(const char *sql);
/* recreate functions */
static CompositeTypeStmt * RecreateCompositeTypeStmt(Oid typeOid);
@ -1144,7 +1143,7 @@ CreateTypeDDLCommandsIdempotent(const ObjectAddress *typeAddress)
/*
* GenerateBackupNameForTypeCollision generates a new type name for an existing type. The
* name is generated in such a way that the new name doesn't overlap with an existing type
* by adding a postfix with incrementing number after the new name.
* by adding a suffix with incrementing number after the new name.
*/
char *
GenerateBackupNameForTypeCollision(const ObjectAddress *address)
@ -1152,26 +1151,26 @@ GenerateBackupNameForTypeCollision(const ObjectAddress *address)
List *names = stringToQualifiedNameList(format_type_be_qualified(address->objectId));
RangeVar *rel = makeRangeVarFromNameList(names);
char newName[NAMEDATALEN] = { 0 };
char postfix[NAMEDATALEN] = { 0 };
char *newName = palloc0(NAMEDATALEN);
char suffix[NAMEDATALEN] = { 0 };
char *baseName = rel->relname;
int baseLength = strlen(baseName);
int count = 0;
while (true)
{
int postfixLength = snprintf(postfix, NAMEDATALEN - 1, "(citus_backup_%d)",
count);
int baseLength = strlen(baseName);
int suffixLength = snprintf(suffix, NAMEDATALEN - 1, "(citus_backup_%d)",
count);
TypeName *newTypeName = NULL;
Oid typeOid = InvalidOid;
/* trim the base name at the end to leave space for the postfix and trailing \0 */
baseLength = Min(baseLength, NAMEDATALEN - postfixLength - 1);
/* trim the base name at the end to leave space for the suffix and trailing \0 */
baseLength = Min(baseLength, NAMEDATALEN - suffixLength - 1);
/* clear newName before copying the potentially trimmed baseName and postfix */
/* clear newName before copying the potentially trimmed baseName and suffix */
memset(newName, 0, NAMEDATALEN);
strncpy(newName, baseName, baseLength);
strncpy(newName + baseLength, postfix, postfixLength);
strncpy(newName + baseLength, suffix, suffixLength);
rel->relname = newName;
newTypeName = makeTypeNameFromNameList(MakeNameListFromRangeVar(rel));
@ -1179,11 +1178,7 @@ GenerateBackupNameForTypeCollision(const ObjectAddress *address)
typeOid = LookupTypeNameOid(NULL, newTypeName, true);
if (typeOid == InvalidOid)
{
/*
* Typename didn't exist yet.
* Need to pstrdup the name as it was stack allocated during calculations.
*/
return pstrdup(newName);
return newName;
}
count++;
@ -1191,40 +1186,6 @@ GenerateBackupNameForTypeCollision(const ObjectAddress *address)
}
/*
* CreateRenameTypeStmt creates a rename statement for a type based on its ObjectAddress.
* The rename statement will rename the existing object on its address to the value
* provided in newName.
*/
RenameStmt *
CreateRenameTypeStmt(const ObjectAddress *address, char *newName)
{
RenameStmt *stmt = NULL;
stmt = makeNode(RenameStmt);
stmt->renameType = OBJECT_TYPE;
stmt->object = (Node *) stringToQualifiedNameList(format_type_be_qualified(
address->objectId));
stmt->newname = newName;
return stmt;
}
/*
* WrapCreateOrReplace takes a sql CREATE command and wraps it in a call to citus' udf to
* create or replace the existing object based on its create command.
*/
const char *
WrapCreateOrReplace(const char *sql)
{
StringInfoData buf = { 0 };
initStringInfo(&buf);
appendStringInfo(&buf, CREATE_OR_REPLACE_COMMAND, quote_literal_cstr(sql));
return buf.data;
}
/*
* FilterNameListForDistributedTypes takes a list of objects to delete, for Types this
* will be a list of TypeName. This list is filtered against the types that are

View File

@ -424,6 +424,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
ddlJobs = PlanDropFunctionStmt(dropStatement, queryString);
@ -480,6 +481,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
ddlJobs = PlanRenameFunctionStmt(renameStmt, queryString);
@ -837,6 +839,7 @@ PlanAlterOwnerStmt(AlterOwnerStmt *stmt, const char *queryString)
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
return PlanAlterFunctionOwnerStmt(stmt, queryString);

View File

@ -35,11 +35,8 @@ static const char * DeparseAlterObjectDependsStmt(AlterObjectDependsStmt *stmt);
* - ALTER TYPE
* - DROP TYPE
*
* - ALTER FUNCTION
* - DROP FUNCTION
*
* - ALTER PROCEDURE
* - DROP PROCEDURE
* - ALTER FUNCTION, ALTER PROCEDURE, ALTER AGGREGATE
* - DROP FUNCTION, DROP PROCEDURE, DROP AGGREGATE
*/
const char *
DeparseTreeNode(Node *stmt)
@ -121,6 +118,7 @@ DeparseDropStmt(DropStmt *stmt)
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
return DeparseDropFunctionStmt(stmt);
@ -187,6 +185,7 @@ DeparseRenameStmt(RenameStmt *stmt)
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
return DeparseRenameFunctionStmt(stmt);
@ -240,6 +239,7 @@ DeparseAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
return DeparseAlterFunctionSchemaStmt(stmt);
@ -272,6 +272,7 @@ DeparseAlterOwnerStmt(AlterOwnerStmt *stmt)
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
return DeparseAlterFunctionOwnerStmt(stmt);
@ -299,6 +300,7 @@ DeparseAlterObjectDependsStmt(AlterObjectDependsStmt *stmt)
switch (stmt->objectType)
{
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
return DeparseAlterFunctionDependsStmt(stmt);

View File

@ -41,6 +41,8 @@
/* forward declaration for deparse functions */
static const char * ObjectTypeToKeyword(ObjectType objtype);
static void AppendAlterFunctionStmt(StringInfo buf, AlterFunctionStmt *stmt);
static void AppendDropFunctionStmt(StringInfo buf, DropStmt *stmt);
static void AppendFunctionName(StringInfo buf, ObjectWithArgs *func, ObjectType objtype);
@ -78,6 +80,37 @@ DeparseAlterFunctionStmt(AlterFunctionStmt *stmt)
}
/*
* ObjectTypeToKeyword returns an appropriate string for the given ObjectType
* Where the string will be one of "FUNCTION", "PROCEDURE", or "AGGREGATE"
*/
static const char *
ObjectTypeToKeyword(ObjectType objtype)
{
switch (objtype)
{
case OBJECT_FUNCTION:
{
return "FUNCTION";
}
case OBJECT_PROCEDURE:
{
return "PROCEDURE";
}
case OBJECT_AGGREGATE:
{
return "AGGREGATE";
}
default:
elog(ERROR, "Unknown object type: %d", objtype);
return NULL;
}
}
/*
* AppendAlterFunctionStmt appends a string representing the AlterFunctionStmt to a buffer
*/
@ -86,18 +119,9 @@ AppendAlterFunctionStmt(StringInfo buf, AlterFunctionStmt *stmt)
{
ListCell *actionCell = NULL;
if (stmt->objtype == OBJECT_FUNCTION)
{
appendStringInfo(buf, "ALTER FUNCTION ");
}
else
{
appendStringInfo(buf, "ALTER PROCEDURE ");
}
appendStringInfo(buf, "ALTER %s ", ObjectTypeToKeyword(stmt->objtype));
AppendFunctionName(buf, stmt->func, stmt->objtype);
foreach(actionCell, stmt->actions)
{
DefElem *def = castNode(DefElem, lfirst(actionCell));
@ -298,7 +322,7 @@ DeparseRenameFunctionStmt(RenameStmt *stmt)
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->renameType == OBJECT_FUNCTION || stmt->renameType == OBJECT_PROCEDURE);
AssertObjectTypeIsFunctional(stmt->renameType);
AppendRenameFunctionStmt(&str, stmt);
@ -314,17 +338,8 @@ AppendRenameFunctionStmt(StringInfo buf, RenameStmt *stmt)
{
ObjectWithArgs *func = castNode(ObjectWithArgs, stmt->object);
if (stmt->renameType == OBJECT_FUNCTION)
{
appendStringInfoString(buf, "ALTER FUNCTION ");
}
else
{
appendStringInfoString(buf, "ALTER PROCEDURE ");
}
appendStringInfo(buf, "ALTER %s ", ObjectTypeToKeyword(stmt->renameType));
AppendFunctionName(buf, func, stmt->renameType);
appendStringInfo(buf, " RENAME TO %s;", quote_identifier(stmt->newname));
}
@ -338,7 +353,7 @@ DeparseAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt)
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE);
AssertObjectTypeIsFunctional(stmt->objectType);
AppendAlterFunctionSchemaStmt(&str, stmt);
@ -354,15 +369,7 @@ AppendAlterFunctionSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt)
{
ObjectWithArgs *func = castNode(ObjectWithArgs, stmt->object);
if (stmt->objectType == OBJECT_FUNCTION)
{
appendStringInfoString(buf, "ALTER FUNCTION ");
}
else
{
appendStringInfoString(buf, "ALTER PROCEDURE ");
}
appendStringInfo(buf, "ALTER %s ", ObjectTypeToKeyword(stmt->objectType));
AppendFunctionName(buf, func, stmt->objectType);
appendStringInfo(buf, " SET SCHEMA %s;", quote_identifier(stmt->newschema));
}
@ -377,7 +384,7 @@ DeparseAlterFunctionOwnerStmt(AlterOwnerStmt *stmt)
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE);
AssertObjectTypeIsFunctional(stmt->objectType);
AppendAlterFunctionOwnerStmt(&str, stmt);
@ -393,15 +400,7 @@ AppendAlterFunctionOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
{
ObjectWithArgs *func = castNode(ObjectWithArgs, stmt->object);
if (stmt->objectType == OBJECT_FUNCTION)
{
appendStringInfoString(buf, "ALTER FUNCTION ");
}
else
{
appendStringInfoString(buf, "ALTER PROCEDURE ");
}
appendStringInfo(buf, "ALTER %s ", ObjectTypeToKeyword(stmt->objectType));
AppendFunctionName(buf, func, stmt->objectType);
appendStringInfo(buf, " OWNER TO %s;", RoleSpecString(stmt->newowner));
}
@ -416,7 +415,7 @@ DeparseAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt)
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE);
AssertObjectTypeIsFunctional(stmt->objectType);
AppendAlterFunctionDependsStmt(&str, stmt);
@ -432,15 +431,7 @@ AppendAlterFunctionDependsStmt(StringInfo buf, AlterObjectDependsStmt *stmt)
{
ObjectWithArgs *func = castNode(ObjectWithArgs, stmt->object);
if (stmt->objectType == OBJECT_FUNCTION)
{
appendStringInfoString(buf, "ALTER FUNCTION ");
}
else
{
appendStringInfoString(buf, "ALTER PROCEDURE ");
}
appendStringInfo(buf, "ALTER %s ", ObjectTypeToKeyword(stmt->objectType));
AppendFunctionName(buf, func, stmt->objectType);
appendStringInfo(buf, " DEPENDS ON EXTENSION %s;", strVal(stmt->extname));
}
@ -455,7 +446,7 @@ DeparseDropFunctionStmt(DropStmt *stmt)
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->removeType == OBJECT_FUNCTION || stmt->removeType == OBJECT_PROCEDURE);
AssertObjectTypeIsFunctional(stmt->removeType);
AppendDropFunctionStmt(&str, stmt);
@ -469,14 +460,7 @@ DeparseDropFunctionStmt(DropStmt *stmt)
static void
AppendDropFunctionStmt(StringInfo buf, DropStmt *stmt)
{
if (stmt->removeType == OBJECT_FUNCTION)
{
appendStringInfoString(buf, "DROP FUNCTION ");
}
else
{
appendStringInfoString(buf, "DROP PROCEDURE ");
}
appendStringInfo(buf, "DROP %s ", ObjectTypeToKeyword(stmt->removeType));
if (stmt->missing_ok)
{

View File

@ -96,6 +96,19 @@ GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok)
castNode(AlterObjectDependsStmt, parseTree), missing_ok);
}
case T_DefineStmt:
{
DefineStmt *stmt = castNode(DefineStmt, parseTree);
if (stmt->kind == OBJECT_AGGREGATE)
{
return DefineAggregateStmtObjectAddress(stmt, missing_ok);
}
ereport(ERROR, (errmsg(
"unsupported object type to get object address for DefineStmt")));
return NULL;
}
default:
{
/*
@ -144,6 +157,7 @@ RenameStmtObjectAddress(RenameStmt *stmt, bool missing_ok)
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
return RenameFunctionStmtObjectAddress(stmt, missing_ok);
@ -169,6 +183,7 @@ AlterObjectSchemaStmtObjectAddress(AlterObjectSchemaStmt *stmt, bool missing_ok)
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
return AlterFunctionSchemaStmtObjectAddress(stmt, missing_ok);
@ -215,6 +230,7 @@ AlterOwnerStmtObjectAddress(AlterOwnerStmt *stmt, bool missing_ok)
}
case OBJECT_PROCEDURE:
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
{
return AlterFunctionOwnerObjectAddress(stmt, missing_ok);

View File

@ -28,13 +28,23 @@
#include "utils/syscache.h"
/* forward declaration for qualify functions */
void QualifyFunction(ObjectWithArgs *func, ObjectType type);
void QualifyFunctionSchemaName(ObjectWithArgs *func, ObjectType type);
static void QualifyFunction(ObjectWithArgs *func, ObjectType type);
static void QualifyFunctionSchemaName(ObjectWithArgs *func, ObjectType type);
/* AssertObjectTypeIsFunctionType asserts we aren't receiving something we shouldn't */
void
AssertObjectTypeIsFunctional(ObjectType type)
{
Assert(type == OBJECT_AGGREGATE ||
type == OBJECT_FUNCTION ||
type == OBJECT_PROCEDURE);
}
/*
* QualifyAlterFunctionStmt transforms a
* ALTER {FUNCTION|PROCEDURE} ..
* ALTER {AGGREGATE|FUNCTION|PROCEDURE} ..
* statement in place and makes all (supported) statements fully qualified.
*
* Note that not all queries of this form are valid AlterFunctionStmt
@ -43,19 +53,21 @@ void QualifyFunctionSchemaName(ObjectWithArgs *func, ObjectType type);
void
QualifyAlterFunctionStmt(AlterFunctionStmt *stmt)
{
AssertObjectTypeIsFunctional(stmt->objtype);
QualifyFunction(stmt->func, stmt->objtype);
}
/*
* QualifyRenameFunctionStmt transforms a
* ALTER {FUNCTION|PROCEDURE} .. RENAME TO ..
* ALTER {AGGREGATE|FUNCTION|PROCEDURE} .. RENAME TO ..
* statement in place and makes the function name fully qualified.
*/
void
QualifyRenameFunctionStmt(RenameStmt *stmt)
{
Assert(stmt->renameType == OBJECT_FUNCTION || stmt->renameType == OBJECT_PROCEDURE);
AssertObjectTypeIsFunctional(stmt->renameType);
QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->renameType);
}
@ -63,13 +75,13 @@ QualifyRenameFunctionStmt(RenameStmt *stmt)
/*
* QualifyAlterFunctionSchemaStmt transforms a
* ALTER {FUNCTION|PROCEDURE} .. SET SCHEMA ..
* ALTER {AGGREGATE|FUNCTION|PROCEDURE} .. SET SCHEMA ..
* statement in place and makes the function name fully qualified.
*/
void
QualifyAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt)
{
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE);
AssertObjectTypeIsFunctional(stmt->objectType);
QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType);
}
@ -77,13 +89,13 @@ QualifyAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt)
/*
* QualifyAlterFunctionOwnerStmt transforms a
* ALTER {FUNCTION|PROCEDURE} .. OWNER TO ..
* ALTER {AGGREGATE|FUNCTION|PROCEDURE} .. OWNER TO ..
* statement in place and makes the function name fully qualified.
*/
void
QualifyAlterFunctionOwnerStmt(AlterOwnerStmt *stmt)
{
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE);
AssertObjectTypeIsFunctional(stmt->objectType);
QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType);
}
@ -91,13 +103,13 @@ QualifyAlterFunctionOwnerStmt(AlterOwnerStmt *stmt)
/*
* QualifyAlterFunctionDependsStmt transforms a
* ALTER {FUNCTION|PROCEDURE} .. DEPENDS ON EXTENSIOIN ..
* ALTER {FUNCTION|PROCEDURE} .. DEPENDS ON EXTENSION ..
* statement in place and makes the function name fully qualified.
*/
void
QualifyAlterFunctionDependsStmt(AlterObjectDependsStmt *stmt)
{
Assert(stmt->objectType == OBJECT_FUNCTION || stmt->objectType == OBJECT_PROCEDURE);
AssertObjectTypeIsFunctional(stmt->objectType);
QualifyFunction(castNode(ObjectWithArgs, stmt->object), stmt->objectType);
}

View File

@ -434,7 +434,6 @@ static char *generate_function_name(Oid funcid, int nargs,
List *argnames, Oid *argtypes,
bool has_variadic, bool *use_variadic_p,
ParseExprKind special_exprkind);
static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2);
#define only_marker(rte) ((rte)->inh ? "" : "ONLY ")
@ -7878,7 +7877,7 @@ generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes,
* plus the OPERATOR() decoration needed to use a qualified operator name
* in an expression.
*/
static char *
char *
generate_operator_name(Oid operid, Oid arg1, Oid arg2)
{
StringInfoData buf;

View File

@ -435,7 +435,6 @@ static char *generate_function_name(Oid funcid, int nargs,
List *argnames, Oid *argtypes,
bool has_variadic, bool *use_variadic_p,
ParseExprKind special_exprkind);
static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2);
#define only_marker(rte) ((rte)->inh ? "" : "ONLY ")
@ -7879,7 +7878,7 @@ generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes,
* plus the OPERATOR() decoration needed to use a qualified operator name
* in an expression.
*/
static char *
char *
generate_operator_name(Oid operid, Oid arg1, Oid arg2)
{
StringInfoData buf;

View File

@ -1,7 +1,6 @@
/*-------------------------------------------------------------------------
*
* worker_create_if_not_exist.c
* TODO rename file and document, was named after old function
* worker_create_or_replace.c
*
* Copyright (c) 2019, Citus Data, Inc.
*
@ -11,6 +10,7 @@
#include "postgres.h"
#include "catalog/dependency.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "fmgr.h"
#include "nodes/makefuncs.h"
@ -19,21 +19,37 @@
#include "tcop/dest.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/regproc.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/metadata/distobject.h"
#include "distributed/worker_create_or_replace.h"
#include "distributed/worker_protocol.h"
static Node * CreateStmtByObjectAddress(const ObjectAddress *address);
static const char * CreateStmtByObjectAddress(const ObjectAddress *address);
static RenameStmt * CreateRenameStatement(const ObjectAddress *address, char *newName);
static char * GenerateBackupNameForCollision(const ObjectAddress *address);
PG_FUNCTION_INFO_V1(worker_create_or_replace_object);
/*
* WrapCreateOrReplace takes a sql CREATE command and wraps it in a call to citus' udf to
* create or replace the existing object based on its create command.
*/
char *
WrapCreateOrReplace(const char *sql)
{
StringInfoData buf = { 0 };
initStringInfo(&buf);
appendStringInfo(&buf, CREATE_OR_REPLACE_COMMAND, quote_literal_cstr(sql));
return buf.data;
}
/*
* worker_create_or_replace_object(statement text)
*
@ -68,18 +84,15 @@ worker_create_or_replace_object(PG_FUNCTION_ARGS)
address = GetObjectAddressFromParseTree(parseTree, true);
if (ObjectExists(address))
{
Node *localCreateStmt = NULL;
const char *localSqlStatement = NULL;
char *newName = NULL;
RenameStmt *renameStmt = NULL;
const char *sqlRenameStmt = NULL;
RenameStmt *renameStmt = NULL;
const char *localSqlStatement = CreateStmtByObjectAddress(address);
localCreateStmt = CreateStmtByObjectAddress(address);
localSqlStatement = DeparseTreeNode(localCreateStmt);
if (strcmp(sqlStatement, localSqlStatement) == 0)
{
/*
* TODO string compare is a poor mans comparison, but calling equal on the
* TODO string compare is a poor man's comparison, but calling equal on the
* parsetree's returns false because there is extra information list character
* position of some sort
*/
@ -98,7 +111,8 @@ worker_create_or_replace_object(PG_FUNCTION_ARGS)
renameStmt = CreateRenameStatement(address, newName);
sqlRenameStmt = DeparseTreeNode((Node *) renameStmt);
CitusProcessUtility((Node *) renameStmt, sqlRenameStmt, PROCESS_UTILITY_TOPLEVEL,
CitusProcessUtility((Node *) renameStmt, sqlRenameStmt,
PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
}
@ -119,19 +133,25 @@ worker_create_or_replace_object(PG_FUNCTION_ARGS)
* therefore you cannot equal this tree against parsed statement. Instead it can be
* deparsed to do a string comparison.
*/
static Node *
static const char *
CreateStmtByObjectAddress(const ObjectAddress *address)
{
switch (getObjectClass(address))
{
case OCLASS_PROC:
{
return GetFunctionDDLCommand(address->objectId, false);
}
case OCLASS_TYPE:
{
return CreateTypeStmtByObjectAddress(address);
return DeparseTreeNode(CreateTypeStmtByObjectAddress(address));
}
default:
{
ereport(ERROR, (errmsg("unsupported object to construct a create statment")));
ereport(ERROR, (errmsg(
"unsupported object to construct a create statement")));
}
}
}
@ -147,6 +167,11 @@ GenerateBackupNameForCollision(const ObjectAddress *address)
{
switch (getObjectClass(address))
{
case OCLASS_PROC:
{
return GenerateBackupNameForProcCollision(address);
}
case OCLASS_TYPE:
{
return GenerateBackupNameForTypeCollision(address);
@ -162,6 +187,67 @@ GenerateBackupNameForCollision(const ObjectAddress *address)
}
/*
* CreateRenameTypeStmt creates a rename statement for a type based on its ObjectAddress.
* The rename statement will rename the existing object on its address to the value
* provided in newName.
*/
static RenameStmt *
CreateRenameTypeStmt(const ObjectAddress *address, char *newName)
{
RenameStmt *stmt = makeNode(RenameStmt);
stmt->renameType = OBJECT_TYPE;
stmt->object = (Node *) stringToQualifiedNameList(format_type_be_qualified(
address->objectId));
stmt->newname = newName;
return stmt;
}
/*
* CreateRenameTypeStmt creates a rename statement for a type based on its ObjectAddress.
* The rename statement will rename the existing object on its address to the value
* provided in newName.
*/
static RenameStmt *
CreateRenameProcStmt(const ObjectAddress *address, char *newName)
{
RenameStmt *stmt = makeNode(RenameStmt);
switch (get_func_prokind(address->objectId))
{
case PROKIND_AGGREGATE:
{
stmt->renameType = OBJECT_AGGREGATE;
break;
}
case PROKIND_PROCEDURE:
{
stmt->renameType = OBJECT_PROCEDURE;
break;
}
case PROKIND_FUNCTION:
{
stmt->renameType = OBJECT_FUNCTION;
break;
}
default:
elog(ERROR, "Unexpected prokind");
return NULL;
}
stmt->object = (Node *) ObjectWithArgsFromOid(address->objectId);
stmt->newname = newName;
return stmt;
}
/*
* CreateRenameStatement creates a rename statement for an existing object to rename the
* object to newName.
@ -171,6 +257,11 @@ CreateRenameStatement(const ObjectAddress *address, char *newName)
{
switch (getObjectClass(address))
{
case OCLASS_PROC:
{
return CreateRenameProcStmt(address, newName);
}
case OCLASS_TYPE:
{
return CreateRenameTypeStmt(address, newName);

View File

@ -50,6 +50,7 @@ extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid,
StringInfo buffer);
extern char * generate_relation_name(Oid relid, List *namespaces);
extern char * generate_qualified_relation_name(Oid relid);
extern char * generate_operator_name(Oid operid, Oid arg1, Oid arg2);
#endif /* CITUS_RULEUTILS_H */

View File

@ -53,6 +53,8 @@ extern List * ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const
char *queryString);
extern const ObjectAddress * CreateFunctionStmtObjectAddress(CreateFunctionStmt *stmt,
bool missing_ok);
extern const ObjectAddress * DefineAggregateStmtObjectAddress(DefineStmt *stmt, bool
missing_ok);
extern List * PlanAlterFunctionStmt(AlterFunctionStmt *stmt, const char *queryString);
extern const ObjectAddress * AlterFunctionStmtObjectAddress(AlterFunctionStmt *stmt,
bool missing_ok);
@ -189,10 +191,12 @@ extern const ObjectAddress * AlterTypeOwnerObjectAddress(AlterOwnerStmt *stmt,
bool missing_ok);
extern List * CreateTypeDDLCommandsIdempotent(const ObjectAddress *typeAddress);
extern char * GenerateBackupNameForTypeCollision(const ObjectAddress *address);
extern RenameStmt * CreateRenameTypeStmt(const ObjectAddress *address, char *newName);
/* function.c - forward declarations */
extern List * CreateFunctionDDLCommandsIdempotent(const ObjectAddress *functionAddress);
extern char * GetFunctionDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace);
extern char * GenerateBackupNameForProcCollision(const ObjectAddress *address);
extern ObjectWithArgs * ObjectWithArgsFromOid(Oid funcOid);
/* vacuum.c - froward declarations */
extern void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);

View File

@ -27,6 +27,8 @@ extern char * FormatCollateBE(Oid collate_oid);
extern char * FormatCollateBEQualified(Oid collate_oid);
extern char * FormatCollateExtended(Oid collid, bits16 flags);
extern void AssertObjectTypeIsFunctional(ObjectType type);
extern void QualifyTreeNode(Node *stmt);
extern const char * DeparseTreeNode(Node *stmt);

View File

@ -0,0 +1,21 @@
/*-------------------------------------------------------------------------
*
* worker_create_or_replace.h
* Header for handling CREATE OR REPLACE of objects,
* even if postgres lacks CREATE OR REPLACE for those objects
*
* Copyright (c) Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#ifndef WORKER_CREATE_OR_REPLACE_H
#define WORKER_CREATE_OR_REPLACE_H
#define CREATE_OR_REPLACE_COMMAND "SELECT worker_create_or_replace_object(%s);"
extern char * WrapCreateOrReplace(const char *sql);
#endif /* WORKER_CREATE_OR_REPLACE_H */

View File

@ -119,7 +119,7 @@ GROUP BY(1);
------------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -157,7 +157,7 @@ GROUP BY(1);
Group Key: remote_scan.day
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -194,7 +194,7 @@ GROUP BY(1);
------------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -232,7 +232,7 @@ GROUP BY(1);
Group Key: remote_scan.day
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -269,7 +269,7 @@ GROUP BY(1);
------------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -307,7 +307,7 @@ GROUP BY(1);
Group Key: remote_scan.day
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -344,7 +344,7 @@ GROUP BY(1);
------------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -384,7 +384,7 @@ HAVING hll_cardinality(hll_union_agg(unique_users)) > 1;
Filter: (hll_cardinality(hll_union_agg(remote_scan.worker_column_3)) > '1'::double precision)
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task

View File

@ -119,7 +119,7 @@ GROUP BY(1);
------------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -157,7 +157,7 @@ GROUP BY(1);
Group Key: remote_scan.day
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -194,7 +194,7 @@ GROUP BY(1);
------------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -232,7 +232,7 @@ GROUP BY(1);
Group Key: remote_scan.day
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -269,7 +269,7 @@ GROUP BY(1);
------------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -307,7 +307,7 @@ GROUP BY(1);
Group Key: remote_scan.day
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -344,7 +344,7 @@ GROUP BY(1);
------------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
@ -384,7 +384,7 @@ HAVING hll_cardinality(hll_union_agg(unique_users)) > 1;
Filter: (hll_cardinality(hll_union_agg(remote_scan.worker_column_3)) > '1'::double precision)
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Real-Time)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task

View File

@ -37,8 +37,10 @@ CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION add_text(text, text) RETURNS int
AS 'select $1::int + $2::int;'
-- $function$ is what postgres escapes functions with when deparsing
-- make sure $function$ doesn't cause invalid syntax
CREATE FUNCTION add_text(text, text) RETURNS text
AS 'select $function$test$function$ || $1::int || $2::int;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
@ -73,6 +75,66 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- Include aggregate function case
CREATE FUNCTION agg_sfunc(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state + item;
end;
$$;
CREATE FUNCTION agg_invfunc(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state - item;
end;
$$;
CREATE FUNCTION agg_finalfunc(state int, extra int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state * 2;
end;
$$;
CREATE AGGREGATE sum2(int) (
sfunc = agg_sfunc,
stype = int,
sspace = 8,
finalfunc = agg_finalfunc,
finalfunc_extra,
initcond = '5',
msfunc = agg_sfunc,
mstype = int,
msspace = 12,
minvfunc = agg_invfunc,
mfinalfunc = agg_finalfunc,
mfinalfunc_extra,
minitcond = '1',
sortop = ">"
);
-- Test VARIADIC, example taken from postgres test suite
CREATE AGGREGATE my_rank(VARIADIC "any" ORDER BY VARIADIC "any") (
stype = internal,
sfunc = ordered_set_transition_multi,
finalfunc = rank_final,
finalfunc_extra,
hypothetical
);
-- Test deparsing multiple parameters with names
CREATE FUNCTION agg_names_sfunc(state dup_result, x dup_result, yz dup_result)
RETURNS dup_result IMMUTABLE STRICT LANGUAGE sql AS $$
select x.f1 + yz.f1, x.f2 || yz.f2;
$$;
CREATE FUNCTION agg_names_finalfunc(x dup_result)
RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$
begin
return x.f1;
end;
$$;
CREATE AGGREGATE agg_names(x dup_result, yz dup_result) (
stype = dup_result,
sfunc = agg_names_sfunc,
finalfunc = agg_names_finalfunc,
finalfunc_modify = shareable
);
-- make sure to propagate ddl propagation after we have setup our functions, this will
-- allow alter statements to be propagated and keep the functions in sync across machines
SET citus.enable_ddl_propagation TO on;
@ -184,6 +246,25 @@ SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
t
(1 row)
-- distribute aggregate
SELECT create_distributed_function('sum2(int)');
create_distributed_function
-----------------------------
(1 row)
SELECT create_distributed_function('my_rank("any")');
create_distributed_function
-----------------------------
(1 row)
SELECT create_distributed_function('agg_names(dup_result,dup_result)');
create_distributed_function
-----------------------------
(1 row)
-- testing alter statements for a distributed function
-- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set
@ -288,6 +369,15 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER B
(2 rows)
ALTER FUNCTION add2(int,int) RENAME TO add;
ALTER AGGREGATE sum2(int) RENAME TO sum27;
SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'sum27';$$) ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 1
localhost | 57638 | t | 1
(2 rows)
ALTER AGGREGATE sum27(int) RENAME TO sum2;
-- change the owner of the function and verify the owner has been changed on the workers
ALTER FUNCTION add(int,int) OWNER TO functionuser;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
@ -296,6 +386,7 @@ SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
t
(1 row)
ALTER AGGREGATE sum2(int) OWNER TO functionuser;
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
@ -309,6 +400,19 @@ $$);
(localhost,57638,t,"(functionuser,function_tests,add)")
(2 rows)
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
WHERE proname = 'sum2';
$$);
run_command_on_workers
----------------------------------------------------------
(localhost,57637,t,"(functionuser,function_tests,sum2)")
(localhost,57638,t,"(functionuser,function_tests,sum2)")
(2 rows)
-- change the schema of the function and verify the old schema doesn't exist anymore while
-- the new schema has the function.
ALTER FUNCTION add(int,int) SET SCHEMA function_tests2;
@ -333,6 +437,7 @@ SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER B
(2 rows)
ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2;
-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator
CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
@ -369,6 +474,15 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY
localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist
(2 rows)
DROP AGGREGATE function_tests2.sum2(int);
-- call should fail as aggregate should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests2.sum2(id) FROM (select 1 id, 2) subq;') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+---------------------------------------------------------------
localhost | 57637 | f | ERROR: function function_tests2.sum2(integer) does not exist
localhost | 57638 | f | ERROR: function function_tests2.sum2(integer) does not exist
(2 rows)
-- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
ERROR: syntax error at or near "int"
@ -549,7 +663,7 @@ WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass
(1 row)
-- now, re-distributed with the default colocation option, we should still see that the same colocation
-- group preserved, because we're using the default shard creationg settings
-- group preserved, because we're using the default shard creation settings
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
create_distributed_function
-----------------------------

View File

@ -0,0 +1,144 @@
-- This is designed to test worker_create_or_replace_object in PG11 with aggregates
-- Note in PG12 we use CREATE OR REPLACE AGGREGATE, thus the renaming does not occur
CREATE SCHEMA proc_conflict;
SELECT run_command_on_workers($$CREATE SCHEMA proc_conflict;$$);
run_command_on_workers
-------------------------------------
(localhost,57637,t,"CREATE SCHEMA")
(localhost,57638,t,"CREATE SCHEMA")
(2 rows)
\c - - - :worker_1_port
SET search_path TO proc_conflict;
CREATE FUNCTION existing_func(state int, i int) RETURNS int AS $$
BEGIN
RETURN state * 2 + i;
END;
$$ LANGUAGE plpgsql STRICT IMMUTABLE;
CREATE AGGREGATE existing_agg(int) (
SFUNC = existing_func,
STYPE = int
);
\c - - - :master_port
SET search_path TO proc_conflict;
CREATE FUNCTION existing_func(state int, i int) RETURNS int AS $$
BEGIN
RETURN state * i + i;
END;
$$ LANGUAGE plpgsql STRICT IMMUTABLE;
CREATE AGGREGATE existing_agg(int) (
SFUNC = existing_func,
STYPE = int
);
SELECT create_distributed_function('existing_agg(int)');
create_distributed_function
-----------------------------
(1 row)
\c - - - :worker_1_port
SET search_path TO proc_conflict;
WITH data (val) AS (
select 2
union all select 4
union all select 6
)
SELECT existing_agg(val) FROM data;
existing_agg
--------------
78
(1 row)
\c - - - :master_port
SET search_path TO proc_conflict;
WITH data (val) AS (
select 2
union all select 4
union all select 6
)
SELECT existing_agg(val) FROM data;
existing_agg
--------------
78
(1 row)
-- Now drop & recreate in order to make sure rename detects the existing renamed objects
-- hide cascades
SET client_min_messages TO error;
DROP AGGREGATE existing_agg(int) CASCADE;
DROP FUNCTION existing_func(int, int) CASCADE;
\c - - - :worker_1_port
SET search_path TO proc_conflict;
CREATE FUNCTION existing_func(state int, i int) RETURNS int AS $$
BEGIN
RETURN state * 3 + i;
END;
$$ LANGUAGE plpgsql STRICT IMMUTABLE;
CREATE AGGREGATE existing_agg(int) (
SFUNC = existing_func,
STYPE = int
);
\c - - - :master_port
SET search_path TO proc_conflict;
CREATE FUNCTION existing_func(state int, i int) RETURNS int AS $$
BEGIN
RETURN state * 5 + i;
END;
$$ LANGUAGE plpgsql STRICT IMMUTABLE;
CREATE AGGREGATE existing_agg(int) (
SFUNC = existing_func,
STYPE = int
);
SELECT create_distributed_function('existing_agg(int)');
create_distributed_function
-----------------------------
(1 row)
\c - - - :worker_1_port
SET search_path TO proc_conflict;
WITH data (val) AS (
select 2
union all select 4
union all select 6
)
SELECT existing_agg(val) FROM data;
existing_agg
--------------
76
(1 row)
\c - - - :master_port
SET search_path TO proc_conflict;
WITH data (val) AS (
select 2
union all select 4
union all select 6
)
SELECT existing_agg(val) FROM data;
existing_agg
--------------
76
(1 row)
-- now test worker_create_or_replace_object directly
CREATE FUNCTION existing_func2(state int, i int) RETURNS int AS $$
BEGIN
RETURN state + i;
END;
$$ LANGUAGE plpgsql STRICT IMMUTABLE;
SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_agg(integer) (STYPE = integer,SFUNC = proc_conflict.existing_func2)');
worker_create_or_replace_object
---------------------------------
t
(1 row)
SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_agg(integer) (STYPE = integer,SFUNC = proc_conflict.existing_func2)');
worker_create_or_replace_object
---------------------------------
f
(1 row)
-- hide cascades
SET client_min_messages TO error;
DROP SCHEMA proc_conflict CASCADE;

View File

@ -281,7 +281,7 @@ test: ssl_by_default
# object distribution tests
# ---------
test: distributed_types distributed_types_conflict disable_object_propagation distributed_types_xact_add_enum_value
test: distributed_functions
test: distributed_functions distributed_functions_conflict
test: distributed_procedure
# ---------

View File

@ -32,8 +32,10 @@ CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION add_text(text, text) RETURNS int
AS 'select $1::int + $2::int;'
-- $function$ is what postgres escapes functions with when deparsing
-- make sure $function$ doesn't cause invalid syntax
CREATE FUNCTION add_text(text, text) RETURNS text
AS 'select $function$test$function$ || $1::int || $2::int;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
@ -77,6 +79,75 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- Include aggregate function case
CREATE FUNCTION agg_sfunc(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state + item;
end;
$$;
CREATE FUNCTION agg_invfunc(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state - item;
end;
$$;
CREATE FUNCTION agg_finalfunc(state int, extra int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state * 2;
end;
$$;
CREATE AGGREGATE sum2(int) (
sfunc = agg_sfunc,
stype = int,
sspace = 8,
finalfunc = agg_finalfunc,
finalfunc_extra,
initcond = '5',
msfunc = agg_sfunc,
mstype = int,
msspace = 12,
minvfunc = agg_invfunc,
mfinalfunc = agg_finalfunc,
mfinalfunc_extra,
minitcond = '1',
sortop = ">"
);
-- Test VARIADIC, example taken from postgres test suite
CREATE AGGREGATE my_rank(VARIADIC "any" ORDER BY VARIADIC "any") (
stype = internal,
sfunc = ordered_set_transition_multi,
finalfunc = rank_final,
finalfunc_extra,
hypothetical
);
-- Test deparsing multiple parameters with names
CREATE FUNCTION agg_names_sfunc(state dup_result, x dup_result, yz dup_result)
RETURNS dup_result IMMUTABLE STRICT LANGUAGE sql AS $$
select x.f1 + yz.f1, x.f2 || yz.f2;
$$;
CREATE FUNCTION agg_names_finalfunc(x dup_result)
RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$
begin
return x.f1;
end;
$$;
CREATE AGGREGATE agg_names(x dup_result, yz dup_result) (
stype = dup_result,
sfunc = agg_names_sfunc,
finalfunc = agg_names_finalfunc,
finalfunc_modify = shareable
);
-- make sure to propagate ddl propagation after we have setup our functions, this will
-- allow alter statements to be propagated and keep the functions in sync across machines
SET citus.enable_ddl_propagation TO on;
@ -127,6 +198,12 @@ SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'strea
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
-- distribute aggregate
SELECT create_distributed_function('sum2(int)');
SELECT create_distributed_function('my_rank("any")');
SELECT create_distributed_function('agg_names(dup_result,dup_result)');
-- testing alter statements for a distributed function
-- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set
@ -162,9 +239,14 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY
SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2;
ALTER FUNCTION add2(int,int) RENAME TO add;
ALTER AGGREGATE sum2(int) RENAME TO sum27;
SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'sum27';$$) ORDER BY 1,2;
ALTER AGGREGATE sum27(int) RENAME TO sum2;
-- change the owner of the function and verify the owner has been changed on the workers
ALTER FUNCTION add(int,int) OWNER TO functionuser;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER AGGREGATE sum2(int) OWNER TO functionuser;
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
@ -172,6 +254,13 @@ JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
WHERE proname = 'add';
$$);
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace)
WHERE proname = 'sum2';
$$);
-- change the schema of the function and verify the old schema doesn't exist anymore while
-- the new schema has the function.
@ -181,6 +270,8 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY
SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2;
ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2;
-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator
CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
@ -199,6 +290,10 @@ DROP FUNCTION add(int,int);
-- call should fail as function should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
DROP AGGREGATE function_tests2.sum2(int);
-- call should fail as aggregate should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests2.sum2(id) FROM (select 1 id, 2) subq;') ORDER BY 1,2;
-- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
@ -292,7 +387,7 @@ WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
-- now, re-distributed with the default colocation option, we should still see that the same colocation
-- group preserved, because we're using the default shard creationg settings
-- group preserved, because we're using the default shard creation settings
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects

View File

@ -0,0 +1,121 @@
-- This is designed to test worker_create_or_replace_object in PG11 with aggregates
-- Note in PG12 we use CREATE OR REPLACE AGGREGATE, thus the renaming does not occur
CREATE SCHEMA proc_conflict;
SELECT run_command_on_workers($$CREATE SCHEMA proc_conflict;$$);
\c - - - :worker_1_port
SET search_path TO proc_conflict;
CREATE FUNCTION existing_func(state int, i int) RETURNS int AS $$
BEGIN
RETURN state * 2 + i;
END;
$$ LANGUAGE plpgsql STRICT IMMUTABLE;
CREATE AGGREGATE existing_agg(int) (
SFUNC = existing_func,
STYPE = int
);
\c - - - :master_port
SET search_path TO proc_conflict;
CREATE FUNCTION existing_func(state int, i int) RETURNS int AS $$
BEGIN
RETURN state * i + i;
END;
$$ LANGUAGE plpgsql STRICT IMMUTABLE;
CREATE AGGREGATE existing_agg(int) (
SFUNC = existing_func,
STYPE = int
);
SELECT create_distributed_function('existing_agg(int)');
\c - - - :worker_1_port
SET search_path TO proc_conflict;
WITH data (val) AS (
select 2
union all select 4
union all select 6
)
SELECT existing_agg(val) FROM data;
\c - - - :master_port
SET search_path TO proc_conflict;
WITH data (val) AS (
select 2
union all select 4
union all select 6
)
SELECT existing_agg(val) FROM data;
-- Now drop & recreate in order to make sure rename detects the existing renamed objects
-- hide cascades
SET client_min_messages TO error;
DROP AGGREGATE existing_agg(int) CASCADE;
DROP FUNCTION existing_func(int, int) CASCADE;
\c - - - :worker_1_port
SET search_path TO proc_conflict;
CREATE FUNCTION existing_func(state int, i int) RETURNS int AS $$
BEGIN
RETURN state * 3 + i;
END;
$$ LANGUAGE plpgsql STRICT IMMUTABLE;
CREATE AGGREGATE existing_agg(int) (
SFUNC = existing_func,
STYPE = int
);
\c - - - :master_port
SET search_path TO proc_conflict;
CREATE FUNCTION existing_func(state int, i int) RETURNS int AS $$
BEGIN
RETURN state * 5 + i;
END;
$$ LANGUAGE plpgsql STRICT IMMUTABLE;
CREATE AGGREGATE existing_agg(int) (
SFUNC = existing_func,
STYPE = int
);
SELECT create_distributed_function('existing_agg(int)');
\c - - - :worker_1_port
SET search_path TO proc_conflict;
WITH data (val) AS (
select 2
union all select 4
union all select 6
)
SELECT existing_agg(val) FROM data;
\c - - - :master_port
SET search_path TO proc_conflict;
WITH data (val) AS (
select 2
union all select 4
union all select 6
)
SELECT existing_agg(val) FROM data;
-- now test worker_create_or_replace_object directly
CREATE FUNCTION existing_func2(state int, i int) RETURNS int AS $$
BEGIN
RETURN state + i;
END;
$$ LANGUAGE plpgsql STRICT IMMUTABLE;
SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_agg(integer) (STYPE = integer,SFUNC = proc_conflict.existing_func2)');
SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_agg(integer) (STYPE = integer,SFUNC = proc_conflict.existing_func2)');
-- hide cascades
SET client_min_messages TO error;
DROP SCHEMA proc_conflict CASCADE;