diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 2c04a9cb0..079f74a91 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -66,6 +66,7 @@ static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, int *colocationId); static void EnsureSequentialModeForFunctionDDL(void); static void TriggerSyncMetadataToPrimaryNodes(void); +static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static ObjectAddress * FunctionToObjectAddress(ObjectType objectType, ObjectWithArgs *objectWithArgs, @@ -631,6 +632,57 @@ TriggerSyncMetadataToPrimaryNodes(void) } +/* + * ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION + * statement. We only propagate replace's of distributed functions to keep the function on + * the workers in sync with the one on the coordinator. + */ +static bool +ShouldPropagateCreateFunction(CreateFunctionStmt *stmt) +{ + const ObjectAddress *address = NULL; + + if (creating_extension) + { + /* + * extensions should be created separately on the workers, functions cascading + * from an extension should therefore not be propagated. + */ + return false; + } + + if (!EnableDependencyCreation) + { + /* + * we are configured to disable object propagation, should not propagate anything + */ + return false; + } + + if (!stmt->replace) + { + /* + * Since we only care for a replace of distributed functions if the statement is + * not a replace we are going to ignore. + */ + return false; + } + + /* + * Even though its a replace we should accept an non-existing function, it will just + * not be distributed + */ + address = GetObjectAddressFromParseTree((Node *) stmt, true); + if (!IsObjectDistributed(address)) + { + /* do not propagate alter function for non-distributed functions */ + return false; + } + + return true; +} + + /* * ShouldPropagateAlterFunction returns, based on the address of a function, if alter * statements targeting the function should be propagated. @@ -665,6 +717,102 @@ ShouldPropagateAlterFunction(const ObjectAddress *address) } +/* + * PlanCreateFunctionStmt is called during the planning phase for CREATE [OR REPLACE] + * FUNCTION. We primarily care for the replace variant of this statement to keep + * distributed functions in sync. We bail via a check on ShouldPropagateCreateFunction + * which checks for the OR REPLACE modifier. + * + * Since we use pg_get_functiondef to get the ddl command we actually do not do any + * planning here, instead we defer the plan creation to the processing step. + * + * Instead we do our basic housekeeping where we make sure we are on the coordinator and + * can propagate the function in sequential mode. + */ +List * +PlanCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString) +{ + if (!ShouldPropagateCreateFunction(stmt)) + { + return NIL; + } + + EnsureCoordinator(); + + EnsureSequentialModeForFunctionDDL(); + + /* + * ddl jobs will be generated during the Processing phase as we need the function to + * be updated in the catalog to get its sql representation + */ + return NIL; +} + + +/* + * ProcessCreateFunctionStmt actually creates the plan we need to execute for function + * propagation. This is the downside of using pg_get_functiondef to get the sql statement. + * + * Besides creating the plan we also make sure all (new) dependencies of the function are + * created on all nodes. + */ +List * +ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString) +{ + const ObjectAddress *address = NULL; + const char *sql = NULL; + List *commands = NIL; + + if (!ShouldPropagateCreateFunction(stmt)) + { + return NIL; + } + + address = GetObjectAddressFromParseTree((Node *) stmt, false); + EnsureDependenciesExistsOnAllNodes(address); + + sql = GetFunctionDDLCommand(address->objectId); + + commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(ALL_WORKERS, commands); +} + + +/* + * CreateFunctionStmtObjectAddress returns the ObjectAddress for the subject of the + * CREATE [OR REPLACE] FUNCTION statement. If missing_ok is false it will error with the + * normal postgres error for unfound functions. + */ +const ObjectAddress * +CreateFunctionStmtObjectAddress(CreateFunctionStmt *stmt, bool missing_ok) +{ + ObjectType objectType = OBJECT_FUNCTION; + ObjectWithArgs *objectWithArgs = NULL; + ListCell *parameterCell = NULL; + +#if PG_VERSION_NUM > 110000 + if (stmt->is_procedure) + { + objectType = OBJECT_PROCEDURE; + } +#endif + + objectWithArgs = makeNode(ObjectWithArgs); + objectWithArgs->objname = stmt->funcname; + + foreach(parameterCell, stmt->parameters) + { + FunctionParameter *funcParam = castNode(FunctionParameter, lfirst(parameterCell)); + objectWithArgs->objargs = lappend(objectWithArgs->objargs, funcParam->argType); + } + + return FunctionToObjectAddress(objectType, objectWithArgs, missing_ok); +} + + /* * PlanAlterFunctionStmt is invoked for alter function statements with actions. Here we * plan the jobs to be executed on the workers for functions that have been distributed in diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index fc02c5084..3ed5eabfd 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -554,6 +554,12 @@ multi_ProcessUtility(PlannedStmt *pstmt, queryString); } + if (IsA(parsetree, CreateFunctionStmt)) + { + ddlJobs = PlanCreateFunctionStmt(castNode(CreateFunctionStmt, parsetree), + queryString); + } + /* * ALTER TABLE ALL IN TABLESPACE statements have their node type as * AlterTableMoveAllStmt. At the moment we do not support this functionality in @@ -706,6 +712,13 @@ multi_ProcessUtility(PlannedStmt *pstmt, { ProcessAlterEnumStmt(castNode(AlterEnumStmt, parsetree), queryString); } + + if (IsA(parsetree, CreateFunctionStmt)) + { + Assert(ddlJobs == NIL); /* jobs should not have been set before */ + ddlJobs = ProcessCreateFunctionStmt(castNode(CreateFunctionStmt, parsetree), + queryString); + } } /* diff --git a/src/backend/distributed/deparser/objectaddress.c b/src/backend/distributed/deparser/objectaddress.c index 267603a8a..9b69ccfa3 100644 --- a/src/backend/distributed/deparser/objectaddress.c +++ b/src/backend/distributed/deparser/objectaddress.c @@ -82,6 +82,12 @@ GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok) missing_ok); } + case T_CreateFunctionStmt: + { + return CreateFunctionStmtObjectAddress( + castNode(CreateFunctionStmt, parseTree), missing_ok); + } + default: { /* diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 41fb692af..427babfdb 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -51,6 +51,11 @@ extern bool ConstraintIsAForeignKey(char *constraintName, Oid relationId); /* function.c - forward declarations */ +extern List * PlanCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString); +extern List * ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const + char *queryString); +extern const ObjectAddress * CreateFunctionStmtObjectAddress(CreateFunctionStmt *stmt, + bool missing_ok); extern List * PlanAlterFunctionStmt(AlterFunctionStmt *stmt, const char *queryString); extern const ObjectAddress * AlterFunctionStmtObjectAddress(AlterFunctionStmt *stmt, bool missing_ok); diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 0f883dc23..d0e100a95 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -288,6 +288,25 @@ SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER B (2 rows) ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; +-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator +CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer +AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 6 + localhost | 57638 | t | 6 +(2 rows) + DROP FUNCTION add(int,int); -- call should fail as function should have been dropped SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index eaf02faa5..3870e950c 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -149,6 +149,15 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2; ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; +-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator +CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer +AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + DROP FUNCTION add(int,int); -- call should fail as function should have been dropped SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;