From ccabf190902491e04400b05702cde1a1b45c42a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 7 Jan 2020 21:37:21 +0000 Subject: [PATCH] Propagate DROP ROUTINE, ALTER ROUTINE In two places I've made code more straight forward by using ROUTINE in our own codegen Two changes which may seem extraneous: AppendFunctionName was updated to not use pg_get_function_identity_arguments. This is because that function includes ORDER BY when printing an aggregate like my_rank. While ALTER AGGREGATE my_rank(x "any" ORDER BY y "any") is accepted by postgres, ALTER ROUTINE my_rank(x "any" ORDER BY y "any") is not. Tests were updated to use macaddr over integer. Using integer is flaky, our logic could sometimes end up on tables like users_table. I originally wanted to use money, but money isn't hashable. --- .../commands/distribute_object_ops.c | 60 ++++ src/backend/distributed/commands/function.c | 13 +- .../deparser/deparse_function_stmts.c | 88 ++--- .../deparser/qualify_function_stmt.c | 3 +- .../worker/worker_create_or_replace.c | 26 +- .../expected/distributed_functions.out | 321 +++++++++--------- .../expected/multi_deparse_function.out | 16 +- .../regress/sql/distributed_functions.sql | 227 +++++++------ 8 files changed, 383 insertions(+), 371 deletions(-) diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 432604860..3118d5ad6 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -337,6 +337,41 @@ static DistributeObjectOps Procedure_Rename = { .postprocess = NULL, .address = RenameFunctionStmtObjectAddress, }; +static DistributeObjectOps Routine_AlterObjectDepends = { + .deparse = DeparseAlterFunctionDependsStmt, + .qualify = QualifyAlterFunctionDependsStmt, + .preprocess = PreprocessAlterFunctionDependsStmt, + .postprocess = NULL, + .address = AlterFunctionDependsStmtObjectAddress, +}; +static DistributeObjectOps Routine_AlterObjectSchema = { + .deparse = DeparseAlterFunctionSchemaStmt, + .qualify = QualifyAlterFunctionSchemaStmt, + .preprocess = PreprocessAlterFunctionSchemaStmt, + .postprocess = PostprocessAlterFunctionSchemaStmt, + .address = AlterFunctionSchemaStmtObjectAddress, +}; +static DistributeObjectOps Routine_AlterOwner = { + .deparse = DeparseAlterFunctionOwnerStmt, + .qualify = QualifyAlterFunctionOwnerStmt, + .preprocess = PreprocessAlterFunctionOwnerStmt, + .postprocess = NULL, + .address = AlterFunctionOwnerObjectAddress, +}; +static DistributeObjectOps Routine_Drop = { + .deparse = DeparseDropFunctionStmt, + .qualify = NULL, + .preprocess = PreprocessDropFunctionStmt, + .postprocess = NULL, + .address = NULL, +}; +static DistributeObjectOps Routine_Rename = { + .deparse = DeparseRenameFunctionStmt, + .qualify = QualifyRenameFunctionStmt, + .preprocess = PreprocessRenameFunctionStmt, + .postprocess = NULL, + .address = RenameFunctionStmtObjectAddress, +}; static DistributeObjectOps Schema_Drop = { .deparse = NULL, .qualify = NULL, @@ -440,6 +475,11 @@ GetDistributeObjectOps(Node *node) return &Procedure_AlterObjectDepends; } + case OBJECT_ROUTINE: + { + return &Routine_AlterObjectDepends; + } + default: { return &NoDistributeOps; @@ -477,6 +517,11 @@ GetDistributeObjectOps(Node *node) return &Procedure_AlterObjectSchema; } + case OBJECT_ROUTINE: + { + return &Routine_AlterObjectSchema; + } + case OBJECT_TYPE: { return &Type_AlterObjectSchema; @@ -514,6 +559,11 @@ GetDistributeObjectOps(Node *node) return &Procedure_AlterOwner; } + case OBJECT_ROUTINE: + { + return &Routine_AlterOwner; + } + case OBJECT_TYPE: { return &Type_AlterOwner; @@ -665,6 +715,11 @@ GetDistributeObjectOps(Node *node) return &Procedure_Drop; } + case OBJECT_ROUTINE: + { + return &Routine_Drop; + } + case OBJECT_SCHEMA: { return &Schema_Drop; @@ -732,6 +787,11 @@ GetDistributeObjectOps(Node *node) return &Procedure_Rename; } + case OBJECT_ROUTINE: + { + return &Routine_Rename; + } + case OBJECT_TYPE: { return &Type_Rename; diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 94aa05a5c..845a15fb6 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -592,7 +592,6 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid) { HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcOid)); StringInfo alterCommand = makeStringInfo(); - char *kindString = "FUNCTION"; Oid procOwner = InvalidOid; @@ -602,15 +601,6 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid) procOwner = procform->proowner; - if (procform->prokind == PROKIND_PROCEDURE) - { - kindString = "PROCEDURE"; - } - else if (procform->prokind == PROKIND_AGGREGATE) - { - kindString = "AGGREGATE"; - } - ReleaseSysCache(proctup); } else if (!OidIsValid(funcOid) || !HeapTupleIsValid(proctup)) @@ -625,8 +615,7 @@ GetFunctionAlterOwnerCommand(const RegProcedure funcOid) char *functionSignature = format_procedure_qualified(funcOid); char *functionOwner = GetUserNameFromId(procOwner, false); - appendStringInfo(alterCommand, "ALTER %s %s OWNER TO %s;", - kindString, + appendStringInfo(alterCommand, "ALTER ROUTINE %s OWNER TO %s;", functionSignature, quote_identifier(functionOwner)); diff --git a/src/backend/distributed/deparser/deparse_function_stmts.c b/src/backend/distributed/deparser/deparse_function_stmts.c index 95f23b949..f8e968ede 100644 --- a/src/backend/distributed/deparser/deparse_function_stmts.c +++ b/src/backend/distributed/deparser/deparse_function_stmts.c @@ -39,6 +39,7 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" +#include "utils/regproc.h" /* forward declaration for deparse functions */ @@ -107,6 +108,11 @@ ObjectTypeToKeyword(ObjectType objtype) return "AGGREGATE"; } + case OBJECT_ROUTINE: + { + return "ROUTINE"; + } + default: elog(ERROR, "Unknown object type: %d", objtype); return NULL; @@ -623,13 +629,9 @@ AppendFunctionNameList(StringInfo buf, List *objects, ObjectType objtype) static void AppendFunctionName(StringInfo buf, ObjectWithArgs *func, ObjectType objtype) { - char *functionName = NULL; - char *schemaName = NULL; - Oid funcid = LookupFuncWithArgs(objtype, func, true); - HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid)); - if (!HeapTupleIsValid(proctup)) + if (funcid == InvalidOid) { /* * DROP FUNCTION IF EXISTS absent_function arrives here @@ -637,65 +639,33 @@ AppendFunctionName(StringInfo buf, ObjectWithArgs *func, ObjectType objtype) * There is no namespace associated with the nonexistent function, * thus we return the function name as it is provided */ + char *functionName = NULL; + char *schemaName = NULL; + DeconstructQualifiedName(func->objname, &schemaName, &functionName); + + char *qualifiedFunctionName = quote_qualified_identifier(schemaName, + functionName); + appendStringInfoString(buf, qualifiedFunctionName); + + if (!func->args_unspecified) + { + /* + * The function is not found, but there is an argument list specified, this has + * some known issues with the "any" type. However this is mostly a bug in + * postgres' TypeNameListToString. For now the best we can do until we understand + * the underlying cause better. + */ + + const char *args = TypeNameListToString(func->objargs); + appendStringInfo(buf, "(%s)", args); + } } else { - Form_pg_proc procform = (Form_pg_proc) GETSTRUCT(proctup); - functionName = NameStr(procform->proname); - functionName = pstrdup(functionName); /* we release the tuple before used */ - schemaName = get_namespace_name(procform->pronamespace); - - ReleaseSysCache(proctup); + char *functionSignature = format_procedure_qualified(funcid); + appendStringInfoString(buf, functionSignature); } - - char *qualifiedFunctionName = quote_qualified_identifier(schemaName, functionName); - appendStringInfoString(buf, qualifiedFunctionName); - - if (OidIsValid(funcid)) - { - /* - * If the function exists we want to use pg_get_function_identity_arguments to - * serialize its canonical arguments - */ - - /* - * Set search_path to NIL so that all objects outside of pg_catalog will be - * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; - */ - OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - - PushOverrideSearchPath(overridePath); - - Datum sqlTextDatum = DirectFunctionCall1(pg_get_function_identity_arguments, - ObjectIdGetDatum(funcid)); - - /* revert back to original search_path */ - PopOverrideSearchPath(); - - const char *args = TextDatumGetCString(sqlTextDatum); - appendStringInfo(buf, "(%s)", args); - } - else if (!func->args_unspecified) - { - /* - * The function is not found, but there is an argument list specified, this has - * some known issues with the "any" type. However this is mostly a bug in - * postgres' TypeNameListToString. For now the best we can do until we understand - * the underlying cause better. - */ - - const char *args = TypeNameListToString(func->objargs); - appendStringInfo(buf, "(%s)", args); - } - - /* - * If the type is not found, and no argument list given we don't append anything here. - * This will cause mostly the same sql as the original statement. - */ } diff --git a/src/backend/distributed/deparser/qualify_function_stmt.c b/src/backend/distributed/deparser/qualify_function_stmt.c index 168ef0095..fbd6c17a0 100644 --- a/src/backend/distributed/deparser/qualify_function_stmt.c +++ b/src/backend/distributed/deparser/qualify_function_stmt.c @@ -38,7 +38,8 @@ AssertObjectTypeIsFunctional(ObjectType type) { Assert(type == OBJECT_AGGREGATE || type == OBJECT_FUNCTION || - type == OBJECT_PROCEDURE); + type == OBJECT_PROCEDURE || + type == OBJECT_ROUTINE); } diff --git a/src/backend/distributed/worker/worker_create_or_replace.c b/src/backend/distributed/worker/worker_create_or_replace.c index d6a5a1540..417aaead1 100644 --- a/src/backend/distributed/worker/worker_create_or_replace.c +++ b/src/backend/distributed/worker/worker_create_or_replace.c @@ -258,31 +258,7 @@ CreateRenameProcStmt(const ObjectAddress *address, char *newName) { RenameStmt *stmt = makeNode(RenameStmt); - switch (get_func_prokind(address->objectId)) - { - case PROKIND_AGGREGATE: - { - stmt->renameType = OBJECT_AGGREGATE; - break; - } - - case PROKIND_PROCEDURE: - { - stmt->renameType = OBJECT_PROCEDURE; - break; - } - - case PROKIND_FUNCTION: - { - stmt->renameType = OBJECT_FUNCTION; - break; - } - - default: - elog(ERROR, "Unexpected prokind"); - return NULL; - } - + stmt->renameType = OBJECT_ROUTINE; stmt->object = (Node *) ObjectWithArgsFromOid(address->objectId); stmt->newname = newName; diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 6e63d15a2..5e25f0a4f 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -14,13 +14,13 @@ CREATE SCHEMA function_tests2 AUTHORIZATION functionuser; SET search_path TO function_tests; SET citus.shard_count TO 4; -- Create and distribute a simple function -CREATE FUNCTION add(integer, integer) RETURNS integer - AS 'select $1 + $2;' +CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool + AS 'select $1 = $2;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric - AS 'select $1 + $2;' +CREATE FUNCTION eq8(macaddr8, macaddr8) RETURNS bool + AS 'select $1 = $2;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; @@ -40,15 +40,15 @@ CREATE FUNCTION add_polygons(polygon, polygon) RETURNS int -- This will prevent the workers from having those types created. They are -- created just-in-time on function distribution SET citus.enable_ddl_propagation TO off; -CREATE TYPE dup_result AS (f1 int, f2 text); -CREATE FUNCTION dup(int) RETURNS dup_result +CREATE TYPE dup_result AS (f1 macaddr, f2 text); +CREATE FUNCTION dup(macaddr) RETURNS dup_result AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; CREATE FUNCTION increment(int2) RETURNS int AS $$ SELECT $1 + 1$$ LANGUAGE SQL; -CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer - AS 'select $1 + $2;' +CREATE FUNCTION eq_with_param_names(val1 macaddr, val2 macaddr) RETURNS bool + AS 'select $1 = $2;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; @@ -57,8 +57,8 @@ CREATE FUNCTION add_without_param_names(integer, integer) RETURNS integer LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -CREATE FUNCTION "add_mi'xed_param_names"(integer, "va'l1" integer) RETURNS integer - AS 'select $1 + $2;' +CREATE FUNCTION "eq_mi'xed_param_names"(macaddr, "va'l1" macaddr) RETURNS bool + AS 'select $1 = $2;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; @@ -108,7 +108,7 @@ CREATE AGGREGATE my_rank(VARIADIC "any" ORDER BY VARIADIC "any") ( -- Test deparsing multiple parameters with names CREATE FUNCTION agg_names_sfunc(state dup_result, x dup_result, yz dup_result) RETURNS dup_result IMMUTABLE STRICT LANGUAGE sql AS $$ - select x.f1 + yz.f1, x.f2 || yz.f2; + select x.f1 | yz.f1, x.f2 || yz.f2; $$; CREATE FUNCTION agg_names_finalfunc(x dup_result) RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$ @@ -136,7 +136,7 @@ SELECT create_distributed_table('statement_table','id'); (1 row) -- create a table uses streaming-based replication (can be synced) -CREATE TABLE streaming_table(id int); +CREATE TABLE streaming_table(id macaddr); SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('streaming_table','id'); @@ -155,25 +155,25 @@ select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'pr -- if not paremeters are supplied, we'd see that function doesn't have -- distribution_argument_index and colocationid -SELECT create_distributed_function('"add_mi''xed_param_names"(int, int)'); +SELECT create_distributed_function('"eq_mi''xed_param_names"(macaddr, macaddr)'); create_distributed_function --------------------------------------------------------------------- (1 row) SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object -WHERE objid = 'add_mi''xed_param_names(int, int)'::regprocedure; +WHERE objid = 'eq_mi''xed_param_names(macaddr, macaddr)'::regprocedure; ?column? | ?column? --------------------------------------------------------------------- t | t (1 row) -- also show that we can use the function -SELECT * FROM run_command_on_workers('SELECT function_tests."add_mi''xed_param_names"(2,3);') ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT function_tests."eq_mi'xed_param_names"('0123456789ab','ba9876543210');$$) ORDER BY 1,2; nodename | nodeport | success | result --------------------------------------------------------------------- - localhost | 57637 | t | 5 - localhost | 57638 | t | 5 + localhost | 57637 | t | f + localhost | 57638 | t | f (2 rows) -- make sure that none of the active and primary nodes hasmetadata @@ -201,33 +201,33 @@ ERROR: cannot create a function with a distribution argument when citus.replica HINT: Set citus.replication_model to 'streaming' before creating distributed tables END; -- try to co-locate with a table that uses streaming replication -SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table'); +SELECT create_distributed_function('dup(macaddr)', '$1', colocate_with := 'streaming_table'); create_distributed_function --------------------------------------------------------------------- (1 row) -SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; - nodename | nodeport | success | result +SELECT * FROM run_command_on_workers($$SELECT function_tests.dup('0123456789ab');$$) ORDER BY 1,2; + nodename | nodeport | success | result --------------------------------------------------------------------- - localhost | 57637 | t | (42,"42 is text") - localhost | 57638 | t | (42,"42 is text") + localhost | 57637 | t | (01:23:45:67:89:ab,"01:23:45:67:89:ab is text") + localhost | 57638 | t | (01:23:45:67:89:ab,"01:23:45:67:89:ab is text") (2 rows) -SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table'); +SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table'); create_distributed_function --------------------------------------------------------------------- (1 row) -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','0123456789ab');$$) ORDER BY 1,2; nodename | nodeport | success | result --------------------------------------------------------------------- - localhost | 57637 | t | 5 - localhost | 57638 | t | 5 + localhost | 57637 | t | f + localhost | 57638 | t | f (2 rows) -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t @@ -255,129 +255,129 @@ SELECT create_distributed_function('agg_names(dup_result,dup_result)'); -- testing alter statements for a distributed function -- ROWS 5, untested because; -- ERROR: ROWS is not applicable when function does not return a set -ALTER FUNCTION add(int,int) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -ALTER FUNCTION add(int,int) STRICT VOLATILE PARALLEL SAFE; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) STRICT VOLATILE PARALLEL SAFE; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -- Test SET/RESET for alter function -ALTER FUNCTION add(int,int) SET client_min_messages TO warning; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages TO warning; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -ALTER FUNCTION add(int,int) SET client_min_messages TO error; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages TO error; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -ALTER FUNCTION add(int,int) SET client_min_messages TO debug; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER ROUTINE eq(macaddr,macaddr) SET client_min_messages TO debug; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -ALTER FUNCTION add(int,int) RESET client_min_messages; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) RESET client_min_messages; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -ALTER FUNCTION add(int,int) SET "citus.setting;'" TO 'hello '' world'; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) SET "citus.setting;'" TO 'hello '' world'; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -ALTER FUNCTION add(int,int) RESET "citus.setting;'"; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) RESET "citus.setting;'"; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -ALTER FUNCTION add(int,int) SET search_path TO 'sch'';ma', public; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) SET search_path TO 'sch'';ma', public; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -ALTER FUNCTION add(int,int) RESET search_path; +ALTER FUNCTION eq(macaddr,macaddr) RESET search_path; -- SET ... FROM CURRENT is not supported, verify the query fails with a descriptive error irregardless of where in the action list the statement occurs -ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT; +ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages FROM CURRENT; ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value. -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT; +ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT; ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value. -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT SECURITY DEFINER; +ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages FROM CURRENT SECURITY DEFINER; ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value. -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -- rename function and make sure the new name can be used on the workers while the old name can't -ALTER FUNCTION add(int,int) RENAME TO add2; -SELECT public.verify_function_is_same_on_workers('function_tests.add2(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) RENAME TO eq2; +SELECT public.verify_function_is_same_on_workers('function_tests.eq2(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; - nodename | nodeport | success | result +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012346789ab','012345689ab');$$) ORDER BY 1,2; + nodename | nodeport | success | result --------------------------------------------------------------------- - localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist - localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist + localhost | 57637 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist + localhost | 57638 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist (2 rows) -SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq2('012345689ab','012345689ab');$$) ORDER BY 1,2; nodename | nodeport | success | result --------------------------------------------------------------------- - localhost | 57637 | t | 5 - localhost | 57638 | t | 5 + localhost | 57637 | t | t + localhost | 57638 | t | t (2 rows) -ALTER FUNCTION add2(int,int) RENAME TO add; +ALTER ROUTINE eq2(macaddr,macaddr) RENAME TO eq; ALTER AGGREGATE sum2(int) RENAME TO sum27; SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'sum27';$$) ORDER BY 1,2; nodename | nodeport | success | result @@ -388,99 +388,109 @@ SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'su ALTER AGGREGATE sum27(int) RENAME TO sum2; -- change the owner of the function and verify the owner has been changed on the workers -ALTER FUNCTION add(int,int) OWNER TO functionuser; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) OWNER TO functionuser; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) ALTER AGGREGATE sum2(int) OWNER TO functionuser; +ALTER ROUTINE my_rank("any") OWNER TO functionuser; +ALTER AGGREGATE my_rank("any") OWNER TO functionuser; SELECT run_command_on_workers($$ -SELECT row(usename, nspname, proname) +SELECT array_agg(row(usename, nspname, proname) order by proname) FROM pg_proc JOIN pg_user ON (usesysid = proowner) JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests') -WHERE proname = 'add'; +WHERE proname IN ('eq', 'sum2', 'my_rank'); $$); - run_command_on_workers + run_command_on_workers --------------------------------------------------------------------- - (localhost,57637,t,"(functionuser,function_tests,add)") - (localhost,57638,t,"(functionuser,function_tests,add)") -(2 rows) - -SELECT run_command_on_workers($$ -SELECT row(usename, nspname, proname) -FROM pg_proc -JOIN pg_user ON (usesysid = proowner) -JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests') -WHERE proname = 'sum2'; -$$); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"(functionuser,function_tests,sum2)") - (localhost,57638,t,"(functionuser,function_tests,sum2)") + (localhost,57637,t,"{""(functionuser,function_tests,eq)"",""(functionuser,function_tests,my_rank)"",""(functionuser,function_tests,sum2)""}") + (localhost,57638,t,"{""(functionuser,function_tests,eq)"",""(functionuser,function_tests,my_rank)"",""(functionuser,function_tests,sum2)""}") (2 rows) -- change the schema of the function and verify the old schema doesn't exist anymore while -- the new schema has the function. -ALTER FUNCTION add(int,int) SET SCHEMA function_tests2; -SELECT public.verify_function_is_same_on_workers('function_tests2.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) SET SCHEMA function_tests2; +SELECT public.verify_function_is_same_on_workers('function_tests2.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; - nodename | nodeport | success | result +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2; + nodename | nodeport | success | result --------------------------------------------------------------------- - localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist - localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist + localhost | 57637 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist + localhost | 57638 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist (2 rows) -SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT function_tests2.eq('012345689ab','ba9876543210');$$) ORDER BY 1,2; nodename | nodeport | success | result --------------------------------------------------------------------- - localhost | 57637 | t | 5 - localhost | 57638 | t | 5 + localhost | 57637 | t | f + localhost | 57638 | t | f (2 rows) -ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; +ALTER ROUTINE function_tests2.eq(macaddr,macaddr) SET SCHEMA function_tests; ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2; -- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator -CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer -AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded +CREATE OR REPLACE FUNCTION eq(macaddr, macaddr) RETURNS bool +AS 'select $1 <> $2;' -- I know, this is not an add, but the output will tell us if the update succeeded LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); verify_function_is_same_on_workers --------------------------------------------------------------------- t (1 row) -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','012345689ab');$$) ORDER BY 1,2; nodename | nodeport | success | result --------------------------------------------------------------------- - localhost | 57637 | t | 6 - localhost | 57638 | t | 6 + localhost | 57637 | t | f + localhost | 57638 | t | f (2 rows) -- distributed functions should not be allowed to depend on an extension, also functions -- that depend on an extension should not be allowed to be distributed. -ALTER FUNCTION add(int,int) DEPENDS ON EXTENSION citus; +ALTER FUNCTION eq(macaddr,macaddr) DEPENDS ON EXTENSION citus; ERROR: distrtibuted functions are not allowed to depend on an extension -DETAIL: Function "function_tests.add(integer,integer)" is already distributed. Functions from extensions are expected to be created on the workers by the extension they depend on. +DETAIL: Function "function_tests.eq(pg_catalog.macaddr,pg_catalog.macaddr)" is already distributed. Functions from extensions are expected to be created on the workers by the extension they depend on. SELECT create_distributed_function('pg_catalog.citus_drop_trigger()'); ERROR: unable to create a distributed function from functions owned by an extension DETAIL: Function "pg_catalog.citus_drop_trigger()" has a dependency on extension "citus". Functions depending on an extension cannot be distributed. Create the function by creating the extension on the workers. -DROP FUNCTION add(int,int); +DROP FUNCTION eq(macaddr,macaddr); -- call should fail as function should have been dropped -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; - nodename | nodeport | success | result +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2; + nodename | nodeport | success | result --------------------------------------------------------------------- - localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist - localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist + localhost | 57637 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist + localhost | 57638 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist +(2 rows) + +-- Test DROP for ROUTINE +CREATE OR REPLACE FUNCTION eq(macaddr, macaddr) RETURNS bool +AS 'select $1 = $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +select create_distributed_function('eq(macaddr,macaddr)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +DROP ROUTINE eq(macaddr, macaddr); +-- call should fail as function should have been dropped +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2; + nodename | nodeport | success | result +--------------------------------------------------------------------- + localhost | 57637 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist + localhost | 57638 | f | ERROR: function function_tests.eq(unknown, unknown) does not exist (2 rows) DROP AGGREGATE function_tests2.sum2(int); @@ -493,51 +503,50 @@ SELECT * FROM run_command_on_workers('SELECT function_tests2.sum2(id) FROM (sele (2 rows) -- postgres doesn't accept parameter names in the regprocedure input -SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); -ERROR: syntax error at or near "int" -CONTEXT: invalid type name "val1 int" +SELECT create_distributed_function('eq_with_param_names(val1 macaddr, macaddr)', 'val1'); +ERROR: invalid type name "val1 macaddr" -- invalid distribution_arg_name -SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='test'); -ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='test'); +ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() -SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='int'); -ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='int'); +ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() -- invalid distribution_arg_index -SELECT create_distributed_function('add_with_param_names(int, int)', '$0'); -ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$0'); +ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() -SELECT create_distributed_function('add_with_param_names(int, int)', '$-1'); -ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$-1'); +ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() -SELECT create_distributed_function('add_with_param_names(int, int)', '$-10'); -ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$-10'); +ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() -SELECT create_distributed_function('add_with_param_names(int, int)', '$3'); -ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$3'); +ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() -SELECT create_distributed_function('add_with_param_names(int, int)', '$1a'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1a'); ERROR: invalid input syntax for integer: "1a" -- non existing column name -SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa'); -ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'aaa'); +ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() -- NULL function SELECT create_distributed_function(NULL); ERROR: the first parameter for create_distributed_function() should be a single a valid function or procedure name followed by a list of parameters in parantheses HINT: skip the parameters with OUT argtype as they are not part of the signature in PostgreSQL -- NULL colocate_with -SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', NULL); ERROR: colocate_with parameter should not be NULL HINT: To use the default value, set colocate_with option to "default" -- empty string distribution_arg_index -SELECT create_distributed_function('add_with_param_names(int, int)', ''); -ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', ''); +ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() -- The first distributed function syncs the metadata to nodes -- and metadata syncing is not supported within transaction blocks BEGIN; - SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); + SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='val1'); create_distributed_function --------------------------------------------------------------------- @@ -545,7 +554,7 @@ BEGIN; ROLLBACK; -- make sure that none of the nodes have the function because we've rollbacked -SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,0) @@ -560,7 +569,7 @@ select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'pr (1 row) -- valid distribution with distribution_arg_name -SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='val1'); create_distributed_function --------------------------------------------------------------------- @@ -574,7 +583,7 @@ select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'p (1 row) -- make sure that both of the nodes have the function because we've succeeded -SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,1) @@ -582,14 +591,14 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_ (2 rows) -- valid distribution with distribution_arg_name -- case insensitive -SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='VaL1'); create_distributed_function --------------------------------------------------------------------- (1 row) -- valid distribution with distribution_arg_index -SELECT create_distributed_function('add_with_param_names(int, int)','$1'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)','$1'); create_distributed_function --------------------------------------------------------------------- @@ -597,7 +606,7 @@ SELECT create_distributed_function('add_with_param_names(int, int)','$1'); -- a function cannot be colocated with a table that is not "streaming" replicated SET citus.shard_replication_factor TO 2; -CREATE TABLE replicated_table_func_test (a int); +CREATE TABLE replicated_table_func_test (a macaddr); SET citus.replication_model TO "statement"; SELECT create_distributed_table('replicated_table_func_test', 'a'); create_distributed_table @@ -605,8 +614,8 @@ SELECT create_distributed_table('replicated_table_func_test', 'a'); (1 row) -SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test'); -ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test" +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', colocate_with:='replicated_table_func_test'); +ERROR: cannot colocate function "eq_with_param_names" and table "replicated_table_func_test" DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model. HINT: When distributing tables make sure that citus.replication_model = 'streaming' SELECT public.wait_until_metadata_sync(); @@ -618,7 +627,7 @@ SELECT public.wait_until_metadata_sync(); -- a function can be colocated with a different distribution argument type -- as long as there is a coercion path SET citus.shard_replication_factor TO 1; -CREATE TABLE replicated_table_func_test_2 (a bigint); +CREATE TABLE replicated_table_func_test_2 (a macaddr8); SET citus.replication_model TO "streaming"; SELECT create_distributed_table('replicated_table_func_test_2', 'a'); create_distributed_table @@ -626,19 +635,19 @@ SELECT create_distributed_table('replicated_table_func_test_2', 'a'); (1 row) -SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_2'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_2'); create_distributed_function --------------------------------------------------------------------- (1 row) -- colocate_with cannot be used without distribution key -SELECT create_distributed_function('add_with_param_names(int, int)', colocate_with:='replicated_table_func_test_2'); -ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', colocate_with:='replicated_table_func_test_2'); +ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid HINT: To provide "colocate_with" option, the distribution argument parameter should also be provided -- a function cannot be colocated with a local table -CREATE TABLE replicated_table_func_test_3 (a bigint); -SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3'); +CREATE TABLE replicated_table_func_test_3 (a macaddr8); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3'); ERROR: relation replicated_table_func_test_3 is not distributed -- a function cannot be colocated with a reference table SELECT create_reference_table('replicated_table_func_test_3'); @@ -647,11 +656,11 @@ SELECT create_reference_table('replicated_table_func_test_3'); (1 row) -SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3'); -ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test_3" because colocate_with option is only supported for hash distributed tables. +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3'); +ERROR: cannot colocate function "eq_with_param_names" and table "replicated_table_func_test_3" because colocate_with option is only supported for hash distributed tables. -- finally, colocate the function with a distributed table SET citus.shard_replication_factor TO 1; -CREATE TABLE replicated_table_func_test_4 (a int); +CREATE TABLE replicated_table_func_test_4 (a macaddr); SET citus.replication_model TO "streaming"; SELECT create_distributed_table('replicated_table_func_test_4', 'a'); create_distributed_table @@ -659,7 +668,7 @@ SELECT create_distributed_table('replicated_table_func_test_4', 'a'); (1 row) -SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test_4'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', colocate_with:='replicated_table_func_test_4'); create_distributed_function --------------------------------------------------------------------- @@ -669,15 +678,15 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$1', coloc SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated FROM pg_dist_partition, citus.pg_dist_object as objects WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND - objects.objid = 'add_with_param_names(int, int)'::regprocedure; + objects.objid = 'eq_with_param_names(macaddr, macaddr)'::regprocedure; table_and_function_colocated --------------------------------------------------------------------- t (1 row) --- now, re-distributed with the default colocation option, we should still see that the same colocation +-- now, redistributed with the default colocation option, we should still see that the same colocation -- group preserved, because we're using the default shard creation settings -SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1'); create_distributed_function --------------------------------------------------------------------- @@ -686,17 +695,17 @@ SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated FROM pg_dist_partition, citus.pg_dist_object as objects WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND - objects.objid = 'add_with_param_names(int, int)'::regprocedure; + objects.objid = 'eq_with_param_names(macaddr, macaddr)'::regprocedure; table_and_function_colocated --------------------------------------------------------------------- t (1 row) --- function with a numeric dist. arg can be colocated with int +-- function with a macaddr8 dist. arg can be colocated with macaddr -- column of a distributed table. In general, if there is a coercion -- path, we rely on postgres for implicit coersions, and users for explicit coersions -- to coerce the values -SELECT create_distributed_function('add_numeric(numeric, numeric)', '$1', colocate_with:='replicated_table_func_test_4'); +SELECT create_distributed_function('eq8(macaddr8, macaddr8)', '$1', colocate_with:='replicated_table_func_test_4'); create_distributed_function --------------------------------------------------------------------- @@ -705,7 +714,7 @@ SELECT create_distributed_function('add_numeric(numeric, numeric)', '$1', coloca SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated FROM pg_dist_partition, citus.pg_dist_object as objects WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND - objects.objid = 'add_numeric(numeric, numeric)'::regprocedure; + objects.objid = 'eq8(macaddr8, macaddr8)'::regprocedure; table_and_function_colocated --------------------------------------------------------------------- t @@ -733,8 +742,8 @@ ERROR: cannot colocate function "replicated_table_func_test_4" and table "add_p -- without the colocate_with, the function errors out since there is no -- default colocation group SET citus.shard_count TO 55; -SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); -ERROR: cannot distribute the function "add_with_param_names" since there is no table to colocate with +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1'); +ERROR: cannot distribute the function "eq_with_param_names" since there is no table to colocate with HINT: Provide a distributed table via "colocate_with" option to create_distributed_function() -- sync metadata to workers for consistent results when clearing objects SELECT public.wait_until_metadata_sync(); diff --git a/src/test/regress/expected/multi_deparse_function.out b/src/test/regress/expected/multi_deparse_function.out index 2af87f91e..e1448d5f1 100644 --- a/src/test/regress/expected/multi_deparse_function.out +++ b/src/test/regress/expected/multi_deparse_function.out @@ -484,7 +484,7 @@ CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE SELECT deparse_and_run_on_workers($cmd$ ALTER FUNCTION pg_catalog.get_shard_id_for_distribution_column(table_name regclass, distribution_value "any") PARALLEL SAFE; $cmd$); -INFO: Propagating deparsed query: ALTER FUNCTION pg_catalog.get_shard_id_for_distribution_column(table_name regclass, distribution_value "any") PARALLEL SAFE; +INFO: Propagating deparsed query: ALTER FUNCTION pg_catalog.get_shard_id_for_distribution_column(pg_catalog.regclass, pg_catalog."any") PARALLEL SAFE; CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE deparse_and_run_on_workers --------------------------------------------------------------------- @@ -608,7 +608,7 @@ CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE SELECT deparse_and_run_on_workers($cmd$ DROP FUNCTION "CiTUS.TEEN2"."TeeNFunCT10N.1!?!"(),"CiTuS.TeeN"."TeeNFunCT10N.1!?!"(text); $cmd$); -INFO: Propagating deparsed query: DROP FUNCTION "CiTUS.TEEN2"."TeeNFunCT10N.1!?!"(), "CiTuS.TeeN"."TeeNFunCT10N.1!?!"(text); +INFO: Propagating deparsed query: DROP FUNCTION "CiTUS.TEEN2"."TeeNFunCT10N.1!?!"(), "CiTuS.TeeN"."TeeNFunCT10N.1!?!"(pg_catalog.text); CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE deparse_and_run_on_workers --------------------------------------------------------------------- @@ -629,7 +629,7 @@ SELECT create_distributed_function('func_default_param(INT)'); SELECT deparse_and_run_on_workers($cmd$ ALTER FUNCTION func_default_param RENAME TO func_with_default_param; $cmd$); -INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_default_param(param integer) RENAME TO func_with_default_param; +INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_default_param(integer) RENAME TO func_with_default_param; CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE deparse_and_run_on_workers --------------------------------------------------------------------- @@ -650,7 +650,7 @@ SELECT create_distributed_function('func_out_param(INT)'); SELECT deparse_and_run_on_workers($cmd$ ALTER FUNCTION func_out_param RENAME TO func_in_and_out_param; $cmd$); -INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_out_param(param integer, OUT result text) RENAME TO func_in_and_out_param; +INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_out_param(integer) RENAME TO func_in_and_out_param; CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE deparse_and_run_on_workers --------------------------------------------------------------------- @@ -674,7 +674,7 @@ SELECT create_distributed_function('square(NUMERIC)'); SELECT deparse_and_run_on_workers($cmd$ ALTER FUNCTION square SET search_path TO DEFAULT; $cmd$); -INFO: Propagating deparsed query: ALTER FUNCTION function_tests.square(INOUT a numeric) SET search_path TO DEFAULT; +INFO: Propagating deparsed query: ALTER FUNCTION function_tests.square(numeric) SET search_path TO DEFAULT; CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE deparse_and_run_on_workers --------------------------------------------------------------------- @@ -705,7 +705,7 @@ SELECT create_distributed_function('sum_avg(NUMERIC[])'); SELECT deparse_and_run_on_workers($cmd$ ALTER FUNCTION sum_avg COST 10000; $cmd$); -INFO: Propagating deparsed query: ALTER FUNCTION function_tests.sum_avg(VARIADIC list numeric[], OUT total numeric, OUT average numeric) COST 10000.000000; +INFO: Propagating deparsed query: ALTER FUNCTION function_tests.sum_avg(numeric[]) COST 10000.000000; CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE deparse_and_run_on_workers --------------------------------------------------------------------- @@ -727,7 +727,7 @@ SELECT create_distributed_function('func_custom_param(intpair)'); SELECT deparse_and_run_on_workers($cmd$ ALTER FUNCTION func_custom_param RENAME TO func_with_custom_param; $cmd$); -INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_custom_param(param function_tests.intpair, OUT total integer) RENAME TO func_with_custom_param; +INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_custom_param(function_tests.intpair) RENAME TO func_with_custom_param; CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE deparse_and_run_on_workers --------------------------------------------------------------------- @@ -749,7 +749,7 @@ SELECT create_distributed_function('func_returns_table(INT)'); SELECT deparse_and_run_on_workers($cmd$ ALTER FUNCTION func_returns_table ROWS 100; $cmd$); -INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_returns_table(count integer) ROWS 100.000000; +INFO: Propagating deparsed query: ALTER FUNCTION function_tests.func_returns_table(integer) ROWS 100.000000; CONTEXT: PL/pgSQL function deparse_and_run_on_workers(text) line 6 at RAISE deparse_and_run_on_workers --------------------------------------------------------------------- diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index 040a05aa7..b4d479260 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -10,14 +10,14 @@ SET search_path TO function_tests; SET citus.shard_count TO 4; -- Create and distribute a simple function -CREATE FUNCTION add(integer, integer) RETURNS integer - AS 'select $1 + $2;' +CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool + AS 'select $1 = $2;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -CREATE FUNCTION add_numeric(numeric, numeric) RETURNS numeric - AS 'select $1 + $2;' +CREATE FUNCTION eq8(macaddr8, macaddr8) RETURNS bool + AS 'select $1 = $2;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; @@ -41,9 +41,9 @@ CREATE FUNCTION add_polygons(polygon, polygon) RETURNS int -- created just-in-time on function distribution SET citus.enable_ddl_propagation TO off; -CREATE TYPE dup_result AS (f1 int, f2 text); +CREATE TYPE dup_result AS (f1 macaddr, f2 text); -CREATE FUNCTION dup(int) RETURNS dup_result +CREATE FUNCTION dup(macaddr) RETURNS dup_result AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; @@ -51,8 +51,8 @@ CREATE FUNCTION increment(int2) RETURNS int AS $$ SELECT $1 + 1$$ LANGUAGE SQL; -CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer - AS 'select $1 + $2;' +CREATE FUNCTION eq_with_param_names(val1 macaddr, val2 macaddr) RETURNS bool + AS 'select $1 = $2;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; @@ -63,8 +63,8 @@ CREATE FUNCTION add_without_param_names(integer, integer) RETURNS integer IMMUTABLE RETURNS NULL ON NULL INPUT; -CREATE FUNCTION "add_mi'xed_param_names"(integer, "va'l1" integer) RETURNS integer - AS 'select $1 + $2;' +CREATE FUNCTION "eq_mi'xed_param_names"(macaddr, "va'l1" macaddr) RETURNS bool + AS 'select $1 = $2;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; @@ -120,7 +120,7 @@ CREATE AGGREGATE my_rank(VARIADIC "any" ORDER BY VARIADIC "any") ( -- Test deparsing multiple parameters with names CREATE FUNCTION agg_names_sfunc(state dup_result, x dup_result, yz dup_result) RETURNS dup_result IMMUTABLE STRICT LANGUAGE sql AS $$ - select x.f1 + yz.f1, x.f2 || yz.f2; + select x.f1 | yz.f1, x.f2 || yz.f2; $$; CREATE FUNCTION agg_names_finalfunc(x dup_result) @@ -149,7 +149,7 @@ SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('statement_table','id'); -- create a table uses streaming-based replication (can be synced) -CREATE TABLE streaming_table(id int); +CREATE TABLE streaming_table(id macaddr); SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('streaming_table','id'); @@ -160,12 +160,12 @@ select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'pr -- if not paremeters are supplied, we'd see that function doesn't have -- distribution_argument_index and colocationid -SELECT create_distributed_function('"add_mi''xed_param_names"(int, int)'); +SELECT create_distributed_function('"eq_mi''xed_param_names"(macaddr, macaddr)'); SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object -WHERE objid = 'add_mi''xed_param_names(int, int)'::regprocedure; +WHERE objid = 'eq_mi''xed_param_names(macaddr, macaddr)'::regprocedure; -- also show that we can use the function -SELECT * FROM run_command_on_workers('SELECT function_tests."add_mi''xed_param_names"(2,3);') ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT function_tests."eq_mi'xed_param_names"('0123456789ab','ba9876543210');$$) ORDER BY 1,2; -- make sure that none of the active and primary nodes hasmetadata -- since the function doesn't have a parameter @@ -181,12 +181,12 @@ SELECT create_distributed_function('increment(int2)', '$1'); END; -- try to co-locate with a table that uses streaming replication -SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table'); -SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; +SELECT create_distributed_function('dup(macaddr)', '$1', colocate_with := 'streaming_table'); +SELECT * FROM run_command_on_workers($$SELECT function_tests.dup('0123456789ab');$$) ORDER BY 1,2; -SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table'); -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table'); +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','0123456789ab');$$) ORDER BY 1,2; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); -- distribute aggregate SELECT create_distributed_function('sum2(int)'); @@ -197,211 +197,218 @@ SELECT create_distributed_function('agg_names(dup_result,dup_result)'); -- testing alter statements for a distributed function -- ROWS 5, untested because; -- ERROR: ROWS is not applicable when function does not return a set -ALTER FUNCTION add(int,int) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -ALTER FUNCTION add(int,int) STRICT VOLATILE PARALLEL SAFE; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) STRICT VOLATILE PARALLEL SAFE; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); -- Test SET/RESET for alter function -ALTER FUNCTION add(int,int) SET client_min_messages TO warning; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -ALTER FUNCTION add(int,int) SET client_min_messages TO error; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -ALTER FUNCTION add(int,int) SET client_min_messages TO debug; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -ALTER FUNCTION add(int,int) RESET client_min_messages; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -ALTER FUNCTION add(int,int) SET "citus.setting;'" TO 'hello '' world'; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -ALTER FUNCTION add(int,int) RESET "citus.setting;'"; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -ALTER FUNCTION add(int,int) SET search_path TO 'sch'';ma', public; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -ALTER FUNCTION add(int,int) RESET search_path; +ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages TO warning; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages TO error; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +ALTER ROUTINE eq(macaddr,macaddr) SET client_min_messages TO debug; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) RESET client_min_messages; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) SET "citus.setting;'" TO 'hello '' world'; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) RESET "citus.setting;'"; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) SET search_path TO 'sch'';ma', public; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) RESET search_path; -- SET ... FROM CURRENT is not supported, verify the query fails with a descriptive error irregardless of where in the action list the statement occurs -ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT SECURITY DEFINER; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages FROM CURRENT; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) SET client_min_messages FROM CURRENT SECURITY DEFINER; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); -- rename function and make sure the new name can be used on the workers while the old name can't -ALTER FUNCTION add(int,int) RENAME TO add2; -SELECT public.verify_function_is_same_on_workers('function_tests.add2(int,int)'); -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; -SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2; -ALTER FUNCTION add2(int,int) RENAME TO add; +ALTER FUNCTION eq(macaddr,macaddr) RENAME TO eq2; +SELECT public.verify_function_is_same_on_workers('function_tests.eq2(macaddr,macaddr)'); +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012346789ab','012345689ab');$$) ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq2('012345689ab','012345689ab');$$) ORDER BY 1,2; +ALTER ROUTINE eq2(macaddr,macaddr) RENAME TO eq; ALTER AGGREGATE sum2(int) RENAME TO sum27; SELECT * FROM run_command_on_workers($$SELECT 1 from pg_proc where proname = 'sum27';$$) ORDER BY 1,2; ALTER AGGREGATE sum27(int) RENAME TO sum2; -- change the owner of the function and verify the owner has been changed on the workers -ALTER FUNCTION add(int,int) OWNER TO functionuser; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION eq(macaddr,macaddr) OWNER TO functionuser; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); ALTER AGGREGATE sum2(int) OWNER TO functionuser; +ALTER ROUTINE my_rank("any") OWNER TO functionuser; +ALTER AGGREGATE my_rank("any") OWNER TO functionuser; SELECT run_command_on_workers($$ -SELECT row(usename, nspname, proname) +SELECT array_agg(row(usename, nspname, proname) order by proname) FROM pg_proc JOIN pg_user ON (usesysid = proowner) JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests') -WHERE proname = 'add'; -$$); -SELECT run_command_on_workers($$ -SELECT row(usename, nspname, proname) -FROM pg_proc -JOIN pg_user ON (usesysid = proowner) -JOIN pg_namespace ON (pg_namespace.oid = pronamespace and nspname = 'function_tests') -WHERE proname = 'sum2'; +WHERE proname IN ('eq', 'sum2', 'my_rank'); $$); -- change the schema of the function and verify the old schema doesn't exist anymore while -- the new schema has the function. -ALTER FUNCTION add(int,int) SET SCHEMA function_tests2; -SELECT public.verify_function_is_same_on_workers('function_tests2.add(int,int)'); -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; -SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2; -ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; +ALTER FUNCTION eq(macaddr,macaddr) SET SCHEMA function_tests2; +SELECT public.verify_function_is_same_on_workers('function_tests2.eq(macaddr,macaddr)'); +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT function_tests2.eq('012345689ab','ba9876543210');$$) ORDER BY 1,2; +ALTER ROUTINE function_tests2.eq(macaddr,macaddr) SET SCHEMA function_tests; ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2; -- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator -CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer -AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded +CREATE OR REPLACE FUNCTION eq(macaddr, macaddr) RETURNS bool +AS 'select $1 <> $2;' -- I know, this is not an add, but the output will tell us if the update succeeded LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; +SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','012345689ab');$$) ORDER BY 1,2; -- distributed functions should not be allowed to depend on an extension, also functions -- that depend on an extension should not be allowed to be distributed. -ALTER FUNCTION add(int,int) DEPENDS ON EXTENSION citus; +ALTER FUNCTION eq(macaddr,macaddr) DEPENDS ON EXTENSION citus; SELECT create_distributed_function('pg_catalog.citus_drop_trigger()'); -DROP FUNCTION add(int,int); +DROP FUNCTION eq(macaddr,macaddr); -- call should fail as function should have been dropped -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2; + +-- Test DROP for ROUTINE +CREATE OR REPLACE FUNCTION eq(macaddr, macaddr) RETURNS bool +AS 'select $1 = $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +select create_distributed_function('eq(macaddr,macaddr)'); + +DROP ROUTINE eq(macaddr, macaddr); +-- call should fail as function should have been dropped +SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2; DROP AGGREGATE function_tests2.sum2(int); -- call should fail as aggregate should have been dropped SELECT * FROM run_command_on_workers('SELECT function_tests2.sum2(id) FROM (select 1 id, 2) subq;') ORDER BY 1,2; -- postgres doesn't accept parameter names in the regprocedure input -SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); +SELECT create_distributed_function('eq_with_param_names(val1 macaddr, macaddr)', 'val1'); -- invalid distribution_arg_name -SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='test'); -SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='int'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='test'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='int'); -- invalid distribution_arg_index -SELECT create_distributed_function('add_with_param_names(int, int)', '$0'); -SELECT create_distributed_function('add_with_param_names(int, int)', '$-1'); -SELECT create_distributed_function('add_with_param_names(int, int)', '$-10'); -SELECT create_distributed_function('add_with_param_names(int, int)', '$3'); -SELECT create_distributed_function('add_with_param_names(int, int)', '$1a'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$0'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$-1'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$-10'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$3'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1a'); -- non existing column name -SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'aaa'); -- NULL function SELECT create_distributed_function(NULL); -- NULL colocate_with -SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', NULL); -- empty string distribution_arg_index -SELECT create_distributed_function('add_with_param_names(int, int)', ''); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', ''); -- The first distributed function syncs the metadata to nodes -- and metadata syncing is not supported within transaction blocks BEGIN; - SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); + SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='val1'); ROLLBACK; -- make sure that none of the nodes have the function because we've rollbacked -SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$); -- make sure that none of the active and primary nodes hasmetadata select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; -- valid distribution with distribution_arg_name -SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='val1'); -- make sure that the primary nodes are now metadata synced select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; -- make sure that both of the nodes have the function because we've succeeded -SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$); -- valid distribution with distribution_arg_name -- case insensitive -SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='VaL1'); -- valid distribution with distribution_arg_index -SELECT create_distributed_function('add_with_param_names(int, int)','$1'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)','$1'); -- a function cannot be colocated with a table that is not "streaming" replicated SET citus.shard_replication_factor TO 2; -CREATE TABLE replicated_table_func_test (a int); +CREATE TABLE replicated_table_func_test (a macaddr); SET citus.replication_model TO "statement"; SELECT create_distributed_table('replicated_table_func_test', 'a'); -SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', colocate_with:='replicated_table_func_test'); SELECT public.wait_until_metadata_sync(); -- a function can be colocated with a different distribution argument type -- as long as there is a coercion path SET citus.shard_replication_factor TO 1; -CREATE TABLE replicated_table_func_test_2 (a bigint); +CREATE TABLE replicated_table_func_test_2 (a macaddr8); SET citus.replication_model TO "streaming"; SELECT create_distributed_table('replicated_table_func_test_2', 'a'); -SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_2'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_2'); -- colocate_with cannot be used without distribution key -SELECT create_distributed_function('add_with_param_names(int, int)', colocate_with:='replicated_table_func_test_2'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', colocate_with:='replicated_table_func_test_2'); -- a function cannot be colocated with a local table -CREATE TABLE replicated_table_func_test_3 (a bigint); -SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3'); +CREATE TABLE replicated_table_func_test_3 (a macaddr8); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3'); -- a function cannot be colocated with a reference table SELECT create_reference_table('replicated_table_func_test_3'); -SELECT create_distributed_function('add_with_param_names(int, int)', 'val1', colocate_with:='replicated_table_func_test_3'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3'); -- finally, colocate the function with a distributed table SET citus.shard_replication_factor TO 1; -CREATE TABLE replicated_table_func_test_4 (a int); +CREATE TABLE replicated_table_func_test_4 (a macaddr); SET citus.replication_model TO "streaming"; SELECT create_distributed_table('replicated_table_func_test_4', 'a'); -SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test_4'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', colocate_with:='replicated_table_func_test_4'); -- show that the colocationIds are the same SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated FROM pg_dist_partition, citus.pg_dist_object as objects WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND - objects.objid = 'add_with_param_names(int, int)'::regprocedure; + objects.objid = 'eq_with_param_names(macaddr, macaddr)'::regprocedure; --- now, re-distributed with the default colocation option, we should still see that the same colocation +-- now, redistributed with the default colocation option, we should still see that the same colocation -- group preserved, because we're using the default shard creation settings -SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1'); SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated FROM pg_dist_partition, citus.pg_dist_object as objects WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND - objects.objid = 'add_with_param_names(int, int)'::regprocedure; + objects.objid = 'eq_with_param_names(macaddr, macaddr)'::regprocedure; --- function with a numeric dist. arg can be colocated with int +-- function with a macaddr8 dist. arg can be colocated with macaddr -- column of a distributed table. In general, if there is a coercion -- path, we rely on postgres for implicit coersions, and users for explicit coersions -- to coerce the values -SELECT create_distributed_function('add_numeric(numeric, numeric)', '$1', colocate_with:='replicated_table_func_test_4'); +SELECT create_distributed_function('eq8(macaddr8, macaddr8)', '$1', colocate_with:='replicated_table_func_test_4'); SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated FROM pg_dist_partition, citus.pg_dist_object as objects WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND - objects.objid = 'add_numeric(numeric, numeric)'::regprocedure; + objects.objid = 'eq8(macaddr8, macaddr8)'::regprocedure; SELECT create_distributed_function('add_text(text, text)', '$1', colocate_with:='replicated_table_func_test_4'); SELECT pg_dist_partition.colocationid = objects.colocationid as table_and_function_colocated @@ -416,7 +423,7 @@ SELECT create_distributed_function('add_polygons(polygon,polygon)', '$1', coloca -- without the colocate_with, the function errors out since there is no -- default colocation group SET citus.shard_count TO 55; -SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1'); -- sync metadata to workers for consistent results when clearing objects SELECT public.wait_until_metadata_sync();