Propagate DROP ROUTINE, ALTER ROUTINE

In two places I've made code more straight forward by using ROUTINE in our own codegen

Two changes which may seem extraneous:

AppendFunctionName was updated to not use pg_get_function_identity_arguments.
This is because that function includes ORDER BY when printing an aggregate like my_rank.
While ALTER AGGREGATE my_rank(x "any" ORDER BY y "any") is accepted by postgres,
ALTER ROUTINE my_rank(x "any" ORDER BY y "any") is not.

Tests were updated to use macaddr over integer. Using integer is flaky, our logic
could sometimes end up on tables like users_table. I originally wanted to use money,
but money isn't hashable.
pull/3367/head
Philip Dubé 2020-01-07 21:37:21 +00:00
parent 8b4429e2dd
commit ccabf19090
8 changed files with 383 additions and 371 deletions

View File

@ -337,6 +337,41 @@ static DistributeObjectOps Procedure_Rename = {
.postprocess = NULL,
.address = RenameFunctionStmtObjectAddress,
};
static DistributeObjectOps Routine_AlterObjectDepends = {
.deparse = DeparseAlterFunctionDependsStmt,
.qualify = QualifyAlterFunctionDependsStmt,
.preprocess = PreprocessAlterFunctionDependsStmt,
.postprocess = NULL,
.address = AlterFunctionDependsStmtObjectAddress,
};
static DistributeObjectOps Routine_AlterObjectSchema = {
.deparse = DeparseAlterFunctionSchemaStmt,
.qualify = QualifyAlterFunctionSchemaStmt,
.preprocess = PreprocessAlterFunctionSchemaStmt,
.postprocess = PostprocessAlterFunctionSchemaStmt,
.address = AlterFunctionSchemaStmtObjectAddress,
};
static DistributeObjectOps Routine_AlterOwner = {
.deparse = DeparseAlterFunctionOwnerStmt,
.qualify = QualifyAlterFunctionOwnerStmt,
.preprocess = PreprocessAlterFunctionOwnerStmt,
.postprocess = NULL,
.address = AlterFunctionOwnerObjectAddress,
};
static DistributeObjectOps Routine_Drop = {
.deparse = DeparseDropFunctionStmt,
.qualify = NULL,
.preprocess = PreprocessDropFunctionStmt,
.postprocess = NULL,
.address = NULL,
};
static DistributeObjectOps Routine_Rename = {
.deparse = DeparseRenameFunctionStmt,
.qualify = QualifyRenameFunctionStmt,
.preprocess = PreprocessRenameFunctionStmt,
.postprocess = NULL,
.address = RenameFunctionStmtObjectAddress,
};
static DistributeObjectOps Schema_Drop = {
.deparse = NULL,
.qualify = NULL,
@ -440,6 +475,11 @@ GetDistributeObjectOps(Node *node)
return &Procedure_AlterObjectDepends;
}
case OBJECT_ROUTINE:
{
return &Routine_AlterObjectDepends;
}
default:
{
return &NoDistributeOps;
@ -477,6 +517,11 @@ GetDistributeObjectOps(Node *node)
return &Procedure_AlterObjectSchema;
}
case OBJECT_ROUTINE:
{
return &Routine_AlterObjectSchema;
}
case OBJECT_TYPE:
{
return &Type_AlterObjectSchema;
@ -514,6 +559,11 @@ GetDistributeObjectOps(Node *node)
return &Procedure_AlterOwner;
}
case OBJECT_ROUTINE:
{
return &Routine_AlterOwner;
}
case OBJECT_TYPE:
{
return &Type_AlterOwner;
@ -665,6 +715,11 @@ GetDistributeObjectOps(Node *node)
return &Procedure_Drop;
}
case OBJECT_ROUTINE:
{
return &Routine_Drop;
}
case OBJECT_SCHEMA:
{
return &Schema_Drop;
@ -732,6 +787,11 @@ GetDistributeObjectOps(Node *node)
return &Procedure_Rename;
}
case OBJECT_ROUTINE:
{
return &Routine_Rename;
}
case OBJECT_TYPE:
{
return &Type_Rename;

View File

@ -592,7 +592,6 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid)
{
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcOid));
StringInfo alterCommand = makeStringInfo();
char *kindString = "FUNCTION";
Oid procOwner = InvalidOid;
@ -602,15 +601,6 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid)
procOwner = procform->proowner;
if (procform->prokind == PROKIND_PROCEDURE)
{
kindString = "PROCEDURE";
}
else if (procform->prokind == PROKIND_AGGREGATE)
{
kindString = "AGGREGATE";
}
ReleaseSysCache(proctup);
}
else if (!OidIsValid(funcOid) || !HeapTupleIsValid(proctup))
@ -625,8 +615,7 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid)
char *functionSignature = format_procedure_qualified(funcOid);
char *functionOwner = GetUserNameFromId(procOwner, false);
appendStringInfo(alterCommand, "ALTER %s %s OWNER TO %s;",
kindString,
appendStringInfo(alterCommand, "ALTER ROUTINE %s OWNER TO %s;",
functionSignature,
quote_identifier(functionOwner));

View File

@ -39,6 +39,7 @@
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/regproc.h"
/* forward declaration for deparse functions */
@ -107,6 +108,11 @@ ObjectTypeToKeyword(ObjectType objtype)
return "AGGREGATE";
}
case OBJECT_ROUTINE:
{
return "ROUTINE";
}
default:
elog(ERROR, "Unknown object type: %d", objtype);
return NULL;
@ -623,13 +629,9 @@ AppendFunctionNameList(StringInfo buf, List *objects, ObjectType objtype)
static void
AppendFunctionName(StringInfo buf, ObjectWithArgs *func, ObjectType objtype)
{
char *functionName = NULL;
char *schemaName = NULL;
Oid funcid = LookupFuncWithArgs(objtype, func, true);
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
if (!HeapTupleIsValid(proctup))
if (funcid == InvalidOid)
{
/*
* DROP FUNCTION IF EXISTS absent_function arrives here
@ -637,65 +639,33 @@ AppendFunctionName(StringInfo buf, ObjectWithArgs *func, ObjectType objtype)
* There is no namespace associated with the nonexistent function,
* thus we return the function name as it is provided
*/
char *functionName = NULL;
char *schemaName = NULL;
DeconstructQualifiedName(func->objname, &schemaName, &functionName);
char *qualifiedFunctionName = quote_qualified_identifier(schemaName,
functionName);
appendStringInfoString(buf, qualifiedFunctionName);
if (!func->args_unspecified)
{
/*
* The function is not found, but there is an argument list specified, this has
* some known issues with the "any" type. However this is mostly a bug in
* postgres' TypeNameListToString. For now the best we can do until we understand
* the underlying cause better.
*/
const char *args = TypeNameListToString(func->objargs);
appendStringInfo(buf, "(%s)", args);
}
}
else
{
Form_pg_proc procform = (Form_pg_proc) GETSTRUCT(proctup);
functionName = NameStr(procform->proname);
functionName = pstrdup(functionName); /* we release the tuple before used */
schemaName = get_namespace_name(procform->pronamespace);
ReleaseSysCache(proctup);
char *functionSignature = format_procedure_qualified(funcid);
appendStringInfoString(buf, functionSignature);
}
char *qualifiedFunctionName = quote_qualified_identifier(schemaName, functionName);
appendStringInfoString(buf, qualifiedFunctionName);
if (OidIsValid(funcid))
{
/*
* If the function exists we want to use pg_get_function_identity_arguments to
* serialize its canonical arguments
*/
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
*/
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
Datum sqlTextDatum = DirectFunctionCall1(pg_get_function_identity_arguments,
ObjectIdGetDatum(funcid));
/* revert back to original search_path */
PopOverrideSearchPath();
const char *args = TextDatumGetCString(sqlTextDatum);
appendStringInfo(buf, "(%s)", args);
}
else if (!func->args_unspecified)
{
/*
* The function is not found, but there is an argument list specified, this has
* some known issues with the "any" type. However this is mostly a bug in
* postgres' TypeNameListToString. For now the best we can do until we understand
* the underlying cause better.
*/
const char *args = TypeNameListToString(func->objargs);
appendStringInfo(buf, "(%s)", args);
}
/*
* If the type is not found, and no argument list given we don't append anything here.
* This will cause mostly the same sql as the original statement.
*/
}

View File

@ -38,7 +38,8 @@ AssertObjectTypeIsFunctional(ObjectType type)
{
Assert(type == OBJECT_AGGREGATE ||
type == OBJECT_FUNCTION ||
type == OBJECT_PROCEDURE);
type == OBJECT_PROCEDURE ||
type == OBJECT_ROUTINE);
}

View File

@ -258,31 +258,7 @@ CreateRenameProcStmt(const ObjectAddress *address, char *newName)
{
RenameStmt *stmt = makeNode(RenameStmt);
switch (get_func_prokind(address->objectId))
{
case PROKIND_AGGREGATE:
{
stmt->renameType = OBJECT_AGGREGATE;
break;
}
case PROKIND_PROCEDURE:
{
stmt->renameType = OBJECT_PROCEDURE;
break;
}
case PROKIND_FUNCTION:
{
stmt->renameType = OBJECT_FUNCTION;
break;
}
default:
elog(ERROR, "Unexpected prokind");
return NULL;
}
stmt->renameType = OBJECT_ROUTINE;
stmt->object = (Node *) ObjectWithArgsFromOid(address->objectId);
stmt->newname = newName;

View File

@ -14,13 +14,13 @@ CREATE SCHEMA function_tests2 AUTHORIZATION functionuser;
SET search_path TO function_tests;
SET citus.shard_count TO 4;
-- Create and distribute a simple function
CREATE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 + $2;'
CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric
AS 'select $1 + $2;'
CREATE FUNCTION eq8(macaddr8, macaddr8) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
@ -40,15 +40,15 @@ CREATE FUNCTION add_polygons(polygon, polygon) RETURNS int
-- This will prevent the workers from having those types created. They are
-- created just-in-time on function distribution
SET citus.enable_ddl_propagation TO off;
CREATE TYPE dup_result AS (f1 int, f2 text);
CREATE FUNCTION dup(int) RETURNS dup_result
CREATE TYPE dup_result AS (f1 macaddr, f2 text);
CREATE FUNCTION dup(macaddr) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
CREATE FUNCTION increment(int2) RETURNS int
AS $$ SELECT $1 + 1$$
LANGUAGE SQL;
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
AS 'select $1 + $2;'
CREATE FUNCTION eq_with_param_names(val1 macaddr, val2 macaddr) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
@ -57,8 +57,8 @@ CREATE FUNCTION add_without_param_names(integer, integer) RETURNS integer
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION "add_mi'xed_param_names"(integer, "va'l1" integer) RETURNS integer
AS 'select $1 + $2;'
CREATE FUNCTION "eq_mi'xed_param_names"(macaddr, "va'l1" macaddr) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
@ -108,7 +108,7 @@ CREATE AGGREGATE my_rank(VARIADIC "any" ORDER BY VARIADIC "any") (
-- Test deparsing multiple parameters with names
CREATE FUNCTION agg_names_sfunc(state dup_result, x dup_result, yz dup_result)
RETURNS dup_result IMMUTABLE STRICT LANGUAGE sql AS $$
select x.f1 + yz.f1, x.f2 || yz.f2;
select x.f1 | yz.f1, x.f2 || yz.f2;
$$;
CREATE FUNCTION agg_names_finalfunc(x dup_result)
RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$
@ -136,7 +136,7 @@ SELECT create_distributed_table('statement_table','id');
(1 row)
-- create a table uses streaming-based replication (can be synced)
CREATE TABLE streaming_table(id int);
CREATE TABLE streaming_table(id macaddr);
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('streaming_table','id');
@ -155,25 +155,25 @@ select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'pr
-- if not paremeters are supplied, we'd see that function doesn't have
-- distribution_argument_index and colocationid
SELECT create_distributed_function('"add_mi''xed_param_names"(int, int)');
SELECT create_distributed_function('"eq_mi''xed_param_names"(macaddr, macaddr)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object
WHERE objid = 'add_mi''xed_param_names(int, int)'::regprocedure;
WHERE objid = 'eq_mi''xed_param_names(macaddr, macaddr)'::regprocedure;
?column? | ?column?
---------------------------------------------------------------------
t | t
(1 row)
-- also show that we can use the function
SELECT * FROM run_command_on_workers('SELECT function_tests."add_mi''xed_param_names"(2,3);') ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT function_tests."eq_mi'xed_param_names"('0123456789ab','ba9876543210');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
localhost | 57637 | t | f
localhost | 57638 | t | f
(2 rows)
-- make sure that none of the active and primary nodes hasmetadata
@ -201,33 +201,33 @@ ERROR: cannot create a function with a distribution argument when citus.replica
HINT: Set citus.replication_model to 'streaming' before creating distributed tables
END;
-- try to co-locate with a table that uses streaming replication
SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table');
SELECT create_distributed_function('dup(macaddr)', '$1', colocate_with := 'streaming_table');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
nodename | nodeport | success | result
SELECT * FROM run_command_on_workers($$SELECT function_tests.dup('0123456789ab');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | (42,"42 is text")
localhost | 57638 | t | (42,"42 is text")
localhost | 57637 | t | (01:23:45:67:89:ab,"01:23:45:67:89:ab is text")
localhost | 57638 | t | (01:23:45:67:89:ab,"01:23:45:67:89:ab is text")
(2 rows)
SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table');
SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','0123456789ab');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
localhost | 57637 | t | f
localhost | 57638 | t | f
(2 rows)
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
@ -255,129 +255,129 @@ SELECT create_distributed_function('agg_names(dup_result,dup_result)');
-- testing alter statements for a distributed function
-- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set
ALTER FUNCTION add(int,int) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) STRICT VOLATILE PARALLEL SAFE;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) STRICT VOLATILE PARALLEL SAFE;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
-- Test SET/RESET for alter function
ALTER FUNCTION add(int,int) SET client_min_messages TO warning;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages TO warning;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) SET client_min_messages TO error;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages TO error;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) SET client_min_messages TO debug;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER ROUTINE eq(macaddr,macaddr) SET client_min_messages TO debug;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) RESET client_min_messages;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) RESET client_min_messages;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) SET "citus.setting;'" TO 'hello '' world';
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) SET "citus.setting;'" TO 'hello '' world';
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) RESET "citus.setting;'";
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) RESET "citus.setting;'";
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) SET search_path TO 'sch'';ma', public;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) SET search_path TO 'sch'';ma', public;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) RESET search_path;
ALTER FUNCTION eq(macaddr,macaddr) RESET search_path;
-- SET ... FROM CURRENT is not supported, verify the query fails with a descriptive error irregardless of where in the action list the statement occurs
ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT;
ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages FROM CURRENT;
ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function
HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value.
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT;
ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT;
ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function
HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value.
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT SECURITY DEFINER;
ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages FROM CURRENT SECURITY DEFINER;
ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function
HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value.
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
-- rename function and make sure the new name can be used on the workers while the old name can't
ALTER FUNCTION add(int,int) RENAME TO add2;
SELECT public.verify_function_is_same_on_workers('function_tests.add2(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) RENAME TO eq2;
SELECT public.verify_function_is_same_on_workers('function_tests.eq2(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012346789ab','012345689ab');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist
localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist
localhost | 57637 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist
localhost | 57638 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist
(2 rows)
SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq2('012345689ab','012345689ab');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
localhost | 57637 | t | t
localhost | 57638 | t | t
(2 rows)
ALTER FUNCTION add2(int,int) RENAME TO add;
ALTER ROUTINE eq2(macaddr,macaddr) RENAME TO eq;
ALTER AGGREGATE sum2(int) RENAME TO sum27;
SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'sum27';$$) ORDER BY 1,2;
nodename | nodeport | success | result
@ -388,99 +388,109 @@ SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'su
ALTER AGGREGATE sum27(int) RENAME TO sum2;
-- change the owner of the function and verify the owner has been changed on the workers
ALTER FUNCTION add(int,int) OWNER TO functionuser;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) OWNER TO functionuser;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER AGGREGATE sum2(int) OWNER TO functionuser;
ALTER ROUTINE my_rank("any") OWNER TO functionuser;
ALTER AGGREGATE my_rank("any") OWNER TO functionuser;
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
SELECT array_agg(row(usename, nspname, proname) order by proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests')
WHERE proname = 'add';
WHERE proname IN ('eq', 'sum2', 'my_rank');
$$);
run_command_on_workers
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"(functionuser,function_tests,add)")
(localhost,57638,t,"(functionuser,function_tests,add)")
(2 rows)
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests')
WHERE proname = 'sum2';
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"(functionuser,function_tests,sum2)")
(localhost,57638,t,"(functionuser,function_tests,sum2)")
(localhost,57637,t,"{""(functionuser,function_tests,eq)"",""(functionuser,function_tests,my_rank)"",""(functionuser,function_tests,sum2)""}")
(localhost,57638,t,"{""(functionuser,function_tests,eq)"",""(functionuser,function_tests,my_rank)"",""(functionuser,function_tests,sum2)""}")
(2 rows)
-- change the schema of the function and verify the old schema doesn't exist anymore while
-- the new schema has the function.
ALTER FUNCTION add(int,int) SET SCHEMA function_tests2;
SELECT public.verify_function_is_same_on_workers('function_tests2.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) SET SCHEMA function_tests2;
SELECT public.verify_function_is_same_on_workers('function_tests2.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist
localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist
localhost | 57637 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist
localhost | 57638 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist
(2 rows)
SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT function_tests2.eq('012345689ab','ba9876543210');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
localhost | 57637 | t | f
localhost | 57638 | t | f
(2 rows)
ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
ALTER ROUTINE function_tests2.eq(macaddr,macaddr) SET SCHEMA function_tests;
ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2;
-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator
CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
CREATE OR REPLACE FUNCTION eq(macaddr, macaddr) RETURNS bool
AS 'select $1 <> $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','012345689ab');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | 6
localhost | 57638 | t | 6
localhost | 57637 | t | f
localhost | 57638 | t | f
(2 rows)
-- distributed functions should not be allowed to depend on an extension, also functions
-- that depend on an extension should not be allowed to be distributed.
ALTER FUNCTION add(int,int) DEPENDS ON EXTENSION citus;
ALTER FUNCTION eq(macaddr,macaddr) DEPENDS ON EXTENSION citus;
ERROR: distrtibuted functions are not allowed to depend on an extension
DETAIL: Function "function_tests.add(integer,integer)" is already distributed. Functions from extensions are expected to be created on the workers by the extension they depend on.
DETAIL: Function "function_tests.eq(pg_catalog.macaddr,pg_catalog.macaddr)" is already distributed. Functions from extensions are expected to be created on the workers by the extension they depend on.
SELECT create_distributed_function('pg_catalog.citus_drop_trigger()');
ERROR: unable to create a distributed function from functions owned by an extension
DETAIL: Function "pg_catalog.citus_drop_trigger()" has a dependency on extension "citus". Functions depending on an extension cannot be distributed. Create the function by creating the extension on the workers.
DROP FUNCTION add(int,int);
DROP FUNCTION eq(macaddr,macaddr);
-- call should fail as function should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist
localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist
localhost | 57637 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist
localhost | 57638 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist
(2 rows)
-- Test DROP for ROUTINE
CREATE OR REPLACE FUNCTION eq(macaddr, macaddr) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
select create_distributed_function('eq(macaddr,macaddr)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
DROP ROUTINE eq(macaddr, macaddr);
-- call should fail as function should have been dropped
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist
localhost | 57638 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist
(2 rows)
DROP AGGREGATE function_tests2.sum2(int);
@ -493,51 +503,50 @@ SELECT * FROM run_command_on_workers('SELECT function_tests2.sum2(id) FROM (sele
(2 rows)
-- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
ERROR: syntax error at or near "int"
CONTEXT: invalid type name "val1 int"
SELECT create_distributed_function('eq_with_param_names(val1 macaddr, macaddr)', 'val1');
ERROR: invalid type name "val1 macaddr"
-- invalid distribution_arg_name
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='test');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='test');
ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='int');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='int');
ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
-- invalid distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)', '$0');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$0');
ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', '$-1');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$-1');
ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', '$-10');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$-10');
ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', '$3');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$3');
ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', '$1a');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1a');
ERROR: invalid input syntax for integer: "1a"
-- non existing column name
SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'aaa');
ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
-- NULL function
SELECT create_distributed_function(NULL);
ERROR: the first parameter for create_distributed_function() should be a single a valid function or procedure name followed by a list of parameters in parantheses
HINT: skip the parameters with OUT argtype as they are not part of the signature in PostgreSQL
-- NULL colocate_with
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL);
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', NULL);
ERROR: colocate_with parameter should not be NULL
HINT: To use the default value, set colocate_with option to "default"
-- empty string distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)', '');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '');
ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
-- The first distributed function syncs the metadata to nodes
-- and metadata syncing is not supported within transaction blocks
BEGIN;
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='val1');
create_distributed_function
---------------------------------------------------------------------
@ -545,7 +554,7 @@ BEGIN;
ROLLBACK;
-- make sure that none of the nodes have the function because we've rollbacked
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$);
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
@ -560,7 +569,7 @@ select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'pr
(1 row)
-- valid distribution with distribution_arg_name
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='val1');
create_distributed_function
---------------------------------------------------------------------
@ -574,7 +583,7 @@ select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'p
(1 row)
-- make sure that both of the nodes have the function because we've succeeded
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$);
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
@ -582,14 +591,14 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_
(2 rows)
-- valid distribution with distribution_arg_name -- case insensitive
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='VaL1');
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- valid distribution with distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)','$1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)','$1');
create_distributed_function
---------------------------------------------------------------------
@ -597,7 +606,7 @@ SELECT create_distributed_function('add_with_param_names(int, int)','$1');
-- a function cannot be colocated with a table that is not "streaming" replicated
SET citus.shard_replication_factor TO 2;
CREATE TABLE replicated_table_func_test (a int);
CREATE TABLE replicated_table_func_test (a macaddr);
SET citus.replication_model TO "statement";
SELECT create_distributed_table('replicated_table_func_test', 'a');
create_distributed_table
@ -605,8 +614,8 @@ SELECT create_distributed_table('replicated_table_func_test', 'a');
(1 row)
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test');
ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test"
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', colocate_with:='replicated_table_func_test');
ERROR: cannot colocate function "eq_with_param_names" and table "replicated_table_func_test"
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
HINT: When distributing tables make sure that citus.replication_model = 'streaming'
SELECT public.wait_until_metadata_sync();
@ -618,7 +627,7 @@ SELECT public.wait_until_metadata_sync();
-- a function can be colocated with a different distribution argument type
-- as long as there is a coercion path
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicated_table_func_test_2 (a bigint);
CREATE TABLE replicated_table_func_test_2 (a macaddr8);
SET citus.replication_model TO "streaming";
SELECT create_distributed_table('replicated_table_func_test_2', 'a');
create_distributed_table
@ -626,19 +635,19 @@ SELECT create_distributed_table('replicated_table_func_test_2', 'a');
(1 row)
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_2');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_2');
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- colocate_with cannot be used without distribution key
SELECT create_distributed_function('add_with_param_names(int, int)', colocate_with:='replicated_table_func_test_2');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', colocate_with:='replicated_table_func_test_2');
ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid
HINT: To provide "colocate_with" option, the distribution argument parameter should also be provided
-- a function cannot be colocated with a local table
CREATE TABLE replicated_table_func_test_3 (a bigint);
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
CREATE TABLE replicated_table_func_test_3 (a macaddr8);
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3');
ERROR: relation replicated_table_func_test_3 is not distributed
-- a function cannot be colocated with a reference table
SELECT create_reference_table('replicated_table_func_test_3');
@ -647,11 +656,11 @@ SELECT create_reference_table('replicated_table_func_test_3');
(1 row)
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test_3" because colocate_with option is only supported for hash distributed tables.
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3');
ERROR: cannot colocate function "eq_with_param_names" and table "replicated_table_func_test_3" because colocate_with option is only supported for hash distributed tables.
-- finally, colocate the function with a distributed table
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicated_table_func_test_4 (a int);
CREATE TABLE replicated_table_func_test_4 (a macaddr);
SET citus.replication_model TO "streaming";
SELECT create_distributed_table('replicated_table_func_test_4', 'a');
create_distributed_table
@ -659,7 +668,7 @@ SELECT create_distributed_table('replicated_table_func_test_4', 'a');
(1 row)
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test_4');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', colocate_with:='replicated_table_func_test_4');
create_distributed_function
---------------------------------------------------------------------
@ -669,15 +678,15 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$1', coloc
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
objects.objid = 'eq_with_param_names(macaddr, macaddr)'::regprocedure;
table_and_function_colocated
---------------------------------------------------------------------
t
(1 row)
-- now, re-distributed with the default colocation option, we should still see that the same colocation
-- now, redistributed with the default colocation option, we should still see that the same colocation
-- group preserved, because we're using the default shard creation settings
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1');
create_distributed_function
---------------------------------------------------------------------
@ -686,17 +695,17 @@ SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
objects.objid = 'eq_with_param_names(macaddr, macaddr)'::regprocedure;
table_and_function_colocated
---------------------------------------------------------------------
t
(1 row)
-- function with a numeric dist. arg can be colocated with int
-- function with a macaddr8 dist. arg can be colocated with macaddr
-- column of a distributed table. In general, if there is a coercion
-- path, we rely on postgres for implicit coersions, and users for explicit coersions
-- to coerce the values
SELECT create_distributed_function('add_numeric(numeric, numeric)', '$1', colocate_with:='replicated_table_func_test_4');
SELECT create_distributed_function('eq8(macaddr8, macaddr8)', '$1', colocate_with:='replicated_table_func_test_4');
create_distributed_function
---------------------------------------------------------------------
@ -705,7 +714,7 @@ SELECT create_distributed_function('add_numeric(numeric, numeric)', '$1', coloca
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
objects.objid = 'add_numeric(numeric, numeric)'::regprocedure;
objects.objid = 'eq8(macaddr8, macaddr8)'::regprocedure;
table_and_function_colocated
---------------------------------------------------------------------
t
@ -733,8 +742,8 @@ ERROR: cannot colocate function "replicated_table_func_test_4" and table "add_p
-- without the colocate_with, the function errors out since there is no
-- default colocation group
SET citus.shard_count TO 55;
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
ERROR: cannot distribute the function "add_with_param_names" since there is no table to colocate with
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1');
ERROR: cannot distribute the function "eq_with_param_names" since there is no table to colocate with
HINT: Provide a distributed table via "colocate_with" option to create_distributed_function()
-- sync metadata to workers for consistent results when clearing objects
SELECT public.wait_until_metadata_sync();

View File

@ -484,7 +484,7 @@ CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE
SELECT deparse_and_run_on_workers($cmd$
ALTER FUNCTION pg_catalog.get_shard_id_for_distribution_column(table_name regclass, distribution_value "any") PARALLEL SAFE;
$cmd$);
INFO: Propagating deparsed query: ALTER FUNCTION pg_catalog.get_shard_id_for_distribution_column(table_name regclass, distribution_value "any") PARALLEL SAFE;
INFO: Propagating deparsed query: ALTER FUNCTION pg_catalog.get_shard_id_for_distribution_column(pg_catalog.regclass, pg_catalog."any") PARALLEL SAFE;
CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE
deparse_and_run_on_workers
---------------------------------------------------------------------
@ -608,7 +608,7 @@ CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE
SELECT deparse_and_run_on_workers($cmd$
DROP FUNCTION "CiTUS.TEEN2"."TeeNFunCT10N.1!?!"(),"CiTuS.TeeN"."TeeNFunCT10N.1!?!"(text);
$cmd$);
INFO: Propagating deparsed query: DROP FUNCTION "CiTUS.TEEN2"."TeeNFunCT10N.1!?!"(), "CiTuS.TeeN"."TeeNFunCT10N.1!?!"(text);
INFO: Propagating deparsed query: DROP FUNCTION "CiTUS.TEEN2"."TeeNFunCT10N.1!?!"(), "CiTuS.TeeN"."TeeNFunCT10N.1!?!"(pg_catalog.text);
CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE
deparse_and_run_on_workers
---------------------------------------------------------------------
@ -629,7 +629,7 @@ SELECT create_distributed_function('func_default_param(INT)');
SELECT deparse_and_run_on_workers($cmd$
ALTER FUNCTION func_default_param RENAME TO func_with_default_param;
$cmd$);
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_default_param(param integer) RENAME TO func_with_default_param;
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_default_param(integer) RENAME TO func_with_default_param;
CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE
deparse_and_run_on_workers
---------------------------------------------------------------------
@ -650,7 +650,7 @@ SELECT create_distributed_function('func_out_param(INT)');
SELECT deparse_and_run_on_workers($cmd$
ALTER FUNCTION func_out_param RENAME TO func_in_and_out_param;
$cmd$);
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_out_param(param integer, OUT result text) RENAME TO func_in_and_out_param;
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_out_param(integer) RENAME TO func_in_and_out_param;
CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE
deparse_and_run_on_workers
---------------------------------------------------------------------
@ -674,7 +674,7 @@ SELECT create_distributed_function('square(NUMERIC)');
SELECT deparse_and_run_on_workers($cmd$
ALTER FUNCTION square SET search_path TO DEFAULT;
$cmd$);
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.square(INOUT a numeric) SET search_path TO DEFAULT;
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.square(numeric) SET search_path TO DEFAULT;
CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE
deparse_and_run_on_workers
---------------------------------------------------------------------
@ -705,7 +705,7 @@ SELECT create_distributed_function('sum_avg(NUMERIC[])');
SELECT deparse_and_run_on_workers($cmd$
ALTER FUNCTION sum_avg COST 10000;
$cmd$);
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.sum_avg(VARIADIC list numeric[], OUT total numeric, OUT average numeric) COST 10000.000000;
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.sum_avg(numeric[]) COST 10000.000000;
CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE
deparse_and_run_on_workers
---------------------------------------------------------------------
@ -727,7 +727,7 @@ SELECT create_distributed_function('func_custom_param(intpair)');
SELECT deparse_and_run_on_workers($cmd$
ALTER FUNCTION func_custom_param RENAME TO func_with_custom_param;
$cmd$);
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_custom_param(param function_tests.intpair, OUT total integer) RENAME TO func_with_custom_param;
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_custom_param(function_tests.intpair) RENAME TO func_with_custom_param;
CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE
deparse_and_run_on_workers
---------------------------------------------------------------------
@ -749,7 +749,7 @@ SELECT create_distributed_function('func_returns_table(INT)');
SELECT deparse_and_run_on_workers($cmd$
ALTER FUNCTION func_returns_table ROWS 100;
$cmd$);
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_returns_table(count integer) ROWS 100.000000;
INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_returns_table(integer) ROWS 100.000000;
CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE
deparse_and_run_on_workers
---------------------------------------------------------------------

View File

@ -10,14 +10,14 @@ SET search_path TO function_tests;
SET citus.shard_count TO 4;
-- Create and distribute a simple function
CREATE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 + $2;'
CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric
AS 'select $1 + $2;'
CREATE FUNCTION eq8(macaddr8, macaddr8) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
@ -41,9 +41,9 @@ CREATE FUNCTION add_polygons(polygon, polygon) RETURNS int
-- created just-in-time on function distribution
SET citus.enable_ddl_propagation TO off;
CREATE TYPE dup_result AS (f1 int, f2 text);
CREATE TYPE dup_result AS (f1 macaddr, f2 text);
CREATE FUNCTION dup(int) RETURNS dup_result
CREATE FUNCTION dup(macaddr) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
@ -51,8 +51,8 @@ CREATE FUNCTION increment(int2) RETURNS int
AS $$ SELECT $1 + 1$$
LANGUAGE SQL;
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
AS 'select $1 + $2;'
CREATE FUNCTION eq_with_param_names(val1 macaddr, val2 macaddr) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
@ -63,8 +63,8 @@ CREATE FUNCTION add_without_param_names(integer, integer) RETURNS integer
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION "add_mi'xed_param_names"(integer, "va'l1" integer) RETURNS integer
AS 'select $1 + $2;'
CREATE FUNCTION "eq_mi'xed_param_names"(macaddr, "va'l1" macaddr) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
@ -120,7 +120,7 @@ CREATE AGGREGATE my_rank(VARIADIC "any" ORDER BY VARIADIC "any") (
-- Test deparsing multiple parameters with names
CREATE FUNCTION agg_names_sfunc(state dup_result, x dup_result, yz dup_result)
RETURNS dup_result IMMUTABLE STRICT LANGUAGE sql AS $$
select x.f1 + yz.f1, x.f2 || yz.f2;
select x.f1 | yz.f1, x.f2 || yz.f2;
$$;
CREATE FUNCTION agg_names_finalfunc(x dup_result)
@ -149,7 +149,7 @@ SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('statement_table','id');
-- create a table uses streaming-based replication (can be synced)
CREATE TABLE streaming_table(id int);
CREATE TABLE streaming_table(id macaddr);
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('streaming_table','id');
@ -160,12 +160,12 @@ select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'pr
-- if not paremeters are supplied, we'd see that function doesn't have
-- distribution_argument_index and colocationid
SELECT create_distributed_function('"add_mi''xed_param_names"(int, int)');
SELECT create_distributed_function('"eq_mi''xed_param_names"(macaddr, macaddr)');
SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object
WHERE objid = 'add_mi''xed_param_names(int, int)'::regprocedure;
WHERE objid = 'eq_mi''xed_param_names(macaddr, macaddr)'::regprocedure;
-- also show that we can use the function
SELECT * FROM run_command_on_workers('SELECT function_tests."add_mi''xed_param_names"(2,3);') ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT function_tests."eq_mi'xed_param_names"('0123456789ab','ba9876543210');$$) ORDER BY 1,2;
-- make sure that none of the active and primary nodes hasmetadata
-- since the function doesn't have a parameter
@ -181,12 +181,12 @@ SELECT create_distributed_function('increment(int2)', '$1');
END;
-- try to co-locate with a table that uses streaming replication
SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table');
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
SELECT create_distributed_function('dup(macaddr)', '$1', colocate_with := 'streaming_table');
SELECT * FROM run_command_on_workers($$SELECT function_tests.dup('0123456789ab');$$) ORDER BY 1,2;
SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table');
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','0123456789ab');$$) ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
-- distribute aggregate
SELECT create_distributed_function('sum2(int)');
@ -197,211 +197,218 @@ SELECT create_distributed_function('agg_names(dup_result,dup_result)');
-- testing alter statements for a distributed function
-- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set
ALTER FUNCTION add(int,int) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) STRICT VOLATILE PARALLEL SAFE;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) STRICT VOLATILE PARALLEL SAFE;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
-- Test SET/RESET for alter function
ALTER FUNCTION add(int,int) SET client_min_messages TO warning;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) SET client_min_messages TO error;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) SET client_min_messages TO debug;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) RESET client_min_messages;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) SET "citus.setting;'" TO 'hello '' world';
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) RESET "citus.setting;'";
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) SET search_path TO 'sch'';ma', public;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) RESET search_path;
ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages TO warning;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages TO error;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER ROUTINE eq(macaddr,macaddr) SET client_min_messages TO debug;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) RESET client_min_messages;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) SET "citus.setting;'" TO 'hello '' world';
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) RESET "citus.setting;'";
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) SET search_path TO 'sch'';ma', public;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) RESET search_path;
-- SET ... FROM CURRENT is not supported, verify the query fails with a descriptive error irregardless of where in the action list the statement occurs
ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT SECURITY DEFINER;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages FROM CURRENT;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages FROM CURRENT SECURITY DEFINER;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
-- rename function and make sure the new name can be used on the workers while the old name can't
ALTER FUNCTION add(int,int) RENAME TO add2;
SELECT public.verify_function_is_same_on_workers('function_tests.add2(int,int)');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2;
ALTER FUNCTION add2(int,int) RENAME TO add;
ALTER FUNCTION eq(macaddr,macaddr) RENAME TO eq2;
SELECT public.verify_function_is_same_on_workers('function_tests.eq2(macaddr,macaddr)');
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012346789ab','012345689ab');$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq2('012345689ab','012345689ab');$$) ORDER BY 1,2;
ALTER ROUTINE eq2(macaddr,macaddr) RENAME TO eq;
ALTER AGGREGATE sum2(int) RENAME TO sum27;
SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'sum27';$$) ORDER BY 1,2;
ALTER AGGREGATE sum27(int) RENAME TO sum2;
-- change the owner of the function and verify the owner has been changed on the workers
ALTER FUNCTION add(int,int) OWNER TO functionuser;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
ALTER FUNCTION eq(macaddr,macaddr) OWNER TO functionuser;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
ALTER AGGREGATE sum2(int) OWNER TO functionuser;
ALTER ROUTINE my_rank("any") OWNER TO functionuser;
ALTER AGGREGATE my_rank("any") OWNER TO functionuser;
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
SELECT array_agg(row(usename, nspname, proname) order by proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests')
WHERE proname = 'add';
$$);
SELECT run_command_on_workers($$
SELECT row(usename, nspname, proname)
FROM pg_proc
JOIN pg_user ON (usesysid = proowner)
JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests')
WHERE proname = 'sum2';
WHERE proname IN ('eq', 'sum2', 'my_rank');
$$);
-- change the schema of the function and verify the old schema doesn't exist anymore while
-- the new schema has the function.
ALTER FUNCTION add(int,int) SET SCHEMA function_tests2;
SELECT public.verify_function_is_same_on_workers('function_tests2.add(int,int)');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2;
ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
ALTER FUNCTION eq(macaddr,macaddr) SET SCHEMA function_tests2;
SELECT public.verify_function_is_same_on_workers('function_tests2.eq(macaddr,macaddr)');
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT function_tests2.eq('012345689ab','ba9876543210');$$) ORDER BY 1,2;
ALTER ROUTINE function_tests2.eq(macaddr,macaddr) SET SCHEMA function_tests;
ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2;
-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator
CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
CREATE OR REPLACE FUNCTION eq(macaddr, macaddr) RETURNS bool
AS 'select $1 <> $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','012345689ab');$$) ORDER BY 1,2;
-- distributed functions should not be allowed to depend on an extension, also functions
-- that depend on an extension should not be allowed to be distributed.
ALTER FUNCTION add(int,int) DEPENDS ON EXTENSION citus;
ALTER FUNCTION eq(macaddr,macaddr) DEPENDS ON EXTENSION citus;
SELECT create_distributed_function('pg_catalog.citus_drop_trigger()');
DROP FUNCTION add(int,int);
DROP FUNCTION eq(macaddr,macaddr);
-- call should fail as function should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2;
-- Test DROP for ROUTINE
CREATE OR REPLACE FUNCTION eq(macaddr, macaddr) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
select create_distributed_function('eq(macaddr,macaddr)');
DROP ROUTINE eq(macaddr, macaddr);
-- call should fail as function should have been dropped
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2;
DROP AGGREGATE function_tests2.sum2(int);
-- call should fail as aggregate should have been dropped
SELECT * FROM run_command_on_workers('SELECT function_tests2.sum2(id) FROM (select 1 id, 2) subq;') ORDER BY 1,2;
-- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
SELECT create_distributed_function('eq_with_param_names(val1 macaddr, macaddr)', 'val1');
-- invalid distribution_arg_name
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='test');
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='int');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='test');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='int');
-- invalid distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)', '$0');
SELECT create_distributed_function('add_with_param_names(int, int)', '$-1');
SELECT create_distributed_function('add_with_param_names(int, int)', '$-10');
SELECT create_distributed_function('add_with_param_names(int, int)', '$3');
SELECT create_distributed_function('add_with_param_names(int, int)', '$1a');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$0');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$-1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$-10');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$3');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1a');
-- non existing column name
SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'aaa');
-- NULL function
SELECT create_distributed_function(NULL);
-- NULL colocate_with
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL);
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', NULL);
-- empty string distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)', '');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '');
-- The first distributed function syncs the metadata to nodes
-- and metadata syncing is not supported within transaction blocks
BEGIN;
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='val1');
ROLLBACK;
-- make sure that none of the nodes have the function because we've rollbacked
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$);
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$);
-- make sure that none of the active and primary nodes hasmetadata
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
-- valid distribution with distribution_arg_name
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='val1');
-- make sure that the primary nodes are now metadata synced
select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
-- make sure that both of the nodes have the function because we've succeeded
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$);
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$);
-- valid distribution with distribution_arg_name -- case insensitive
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='VaL1');
-- valid distribution with distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)','$1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)','$1');
-- a function cannot be colocated with a table that is not "streaming" replicated
SET citus.shard_replication_factor TO 2;
CREATE TABLE replicated_table_func_test (a int);
CREATE TABLE replicated_table_func_test (a macaddr);
SET citus.replication_model TO "statement";
SELECT create_distributed_table('replicated_table_func_test', 'a');
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', colocate_with:='replicated_table_func_test');
SELECT public.wait_until_metadata_sync();
-- a function can be colocated with a different distribution argument type
-- as long as there is a coercion path
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicated_table_func_test_2 (a bigint);
CREATE TABLE replicated_table_func_test_2 (a macaddr8);
SET citus.replication_model TO "streaming";
SELECT create_distributed_table('replicated_table_func_test_2', 'a');
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_2');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_2');
-- colocate_with cannot be used without distribution key
SELECT create_distributed_function('add_with_param_names(int, int)', colocate_with:='replicated_table_func_test_2');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', colocate_with:='replicated_table_func_test_2');
-- a function cannot be colocated with a local table
CREATE TABLE replicated_table_func_test_3 (a bigint);
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
CREATE TABLE replicated_table_func_test_3 (a macaddr8);
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3');
-- a function cannot be colocated with a reference table
SELECT create_reference_table('replicated_table_func_test_3');
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3');
-- finally, colocate the function with a distributed table
SET citus.shard_replication_factor TO 1;
CREATE TABLE replicated_table_func_test_4 (a int);
CREATE TABLE replicated_table_func_test_4 (a macaddr);
SET citus.replication_model TO "streaming";
SELECT create_distributed_table('replicated_table_func_test_4', 'a');
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test_4');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', colocate_with:='replicated_table_func_test_4');
-- show that the colocationIds are the same
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
objects.objid = 'eq_with_param_names(macaddr, macaddr)'::regprocedure;
-- now, re-distributed with the default colocation option, we should still see that the same colocation
-- now, redistributed with the default colocation option, we should still see that the same colocation
-- group preserved, because we're using the default shard creation settings
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1');
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
objects.objid = 'add_with_param_names(int, int)'::regprocedure;
objects.objid = 'eq_with_param_names(macaddr, macaddr)'::regprocedure;
-- function with a numeric dist. arg can be colocated with int
-- function with a macaddr8 dist. arg can be colocated with macaddr
-- column of a distributed table. In general, if there is a coercion
-- path, we rely on postgres for implicit coersions, and users for explicit coersions
-- to coerce the values
SELECT create_distributed_function('add_numeric(numeric, numeric)', '$1', colocate_with:='replicated_table_func_test_4');
SELECT create_distributed_function('eq8(macaddr8, macaddr8)', '$1', colocate_with:='replicated_table_func_test_4');
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
FROM pg_dist_partition, citus.pg_dist_object as objects
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
objects.objid = 'add_numeric(numeric, numeric)'::regprocedure;
objects.objid = 'eq8(macaddr8, macaddr8)'::regprocedure;
SELECT create_distributed_function('add_text(text, text)', '$1', colocate_with:='replicated_table_func_test_4');
SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated
@ -416,7 +423,7 @@ SELECT create_distributed_function('add_polygons(polygon,polygon)', '$1', coloca
-- without the colocate_with, the function errors out since there is no
-- default colocation group
SET citus.shard_count TO 55;
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1');
-- sync metadata to workers for consistent results when clearing objects
SELECT public.wait_until_metadata_sync();