From 473cbc011501f47c988d22ff97e191c345063d21 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Mon, 30 Sep 2019 12:41:17 +0200 Subject: [PATCH] Propagate CREATE OR REPLACE FUNCTION to workers for distributed functions (#3043) DESCRIPTION: Propagate CREATE OR REPLACE FUNCTION Distributed functions could be replaced, which should be propagated to the workers to keep the function in sync between all nodes. Due to the complexity of deparsing the `CreateFunctionStmt` we actually produce the plan during the processing phase of our utilityhook. Since the changes have already been made in the catalog tables we can reuse `pg_get_functiondef` to get us the generated `CREATE OR REPLACE` sql. --- src/backend/distributed/commands/function.c | 148 ++++++++++++++++++ .../distributed/commands/utility_hook.c | 13 ++ .../distributed/deparser/objectaddress.c | 6 + src/include/distributed/commands.h | 5 + .../expected/distributed_functions.out | 19 +++ .../regress/sql/distributed_functions.sql | 9 ++ 6 files changed, 200 insertions(+) diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 2c04a9cb0..079f74a91 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -66,6 +66,7 @@ static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, int *colocationId); static void EnsureSequentialModeForFunctionDDL(void); static void TriggerSyncMetadataToPrimaryNodes(void); +static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static ObjectAddress * FunctionToObjectAddress(ObjectType objectType, ObjectWithArgs *objectWithArgs, @@ -631,6 +632,57 @@ TriggerSyncMetadataToPrimaryNodes(void) } +/* + * ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION + * statement. We only propagate replace's of distributed functions to keep the function on + * the workers in sync with the one on the coordinator. + */ +static bool +ShouldPropagateCreateFunction(CreateFunctionStmt *stmt) +{ + const ObjectAddress *address = NULL; + + if (creating_extension) + { + /* + * extensions should be created separately on the workers, functions cascading + * from an extension should therefore not be propagated. + */ + return false; + } + + if (!EnableDependencyCreation) + { + /* + * we are configured to disable object propagation, should not propagate anything + */ + return false; + } + + if (!stmt->replace) + { + /* + * Since we only care for a replace of distributed functions if the statement is + * not a replace we are going to ignore. + */ + return false; + } + + /* + * Even though its a replace we should accept an non-existing function, it will just + * not be distributed + */ + address = GetObjectAddressFromParseTree((Node *) stmt, true); + if (!IsObjectDistributed(address)) + { + /* do not propagate alter function for non-distributed functions */ + return false; + } + + return true; +} + + /* * ShouldPropagateAlterFunction returns, based on the address of a function, if alter * statements targeting the function should be propagated. @@ -665,6 +717,102 @@ ShouldPropagateAlterFunction(const ObjectAddress *address) } +/* + * PlanCreateFunctionStmt is called during the planning phase for CREATE [OR REPLACE] + * FUNCTION. We primarily care for the replace variant of this statement to keep + * distributed functions in sync. We bail via a check on ShouldPropagateCreateFunction + * which checks for the OR REPLACE modifier. + * + * Since we use pg_get_functiondef to get the ddl command we actually do not do any + * planning here, instead we defer the plan creation to the processing step. + * + * Instead we do our basic housekeeping where we make sure we are on the coordinator and + * can propagate the function in sequential mode. + */ +List * +PlanCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString) +{ + if (!ShouldPropagateCreateFunction(stmt)) + { + return NIL; + } + + EnsureCoordinator(); + + EnsureSequentialModeForFunctionDDL(); + + /* + * ddl jobs will be generated during the Processing phase as we need the function to + * be updated in the catalog to get its sql representation + */ + return NIL; +} + + +/* + * ProcessCreateFunctionStmt actually creates the plan we need to execute for function + * propagation. This is the downside of using pg_get_functiondef to get the sql statement. + * + * Besides creating the plan we also make sure all (new) dependencies of the function are + * created on all nodes. + */ +List * +ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString) +{ + const ObjectAddress *address = NULL; + const char *sql = NULL; + List *commands = NIL; + + if (!ShouldPropagateCreateFunction(stmt)) + { + return NIL; + } + + address = GetObjectAddressFromParseTree((Node *) stmt, false); + EnsureDependenciesExistsOnAllNodes(address); + + sql = GetFunctionDDLCommand(address->objectId); + + commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(ALL_WORKERS, commands); +} + + +/* + * CreateFunctionStmtObjectAddress returns the ObjectAddress for the subject of the + * CREATE [OR REPLACE] FUNCTION statement. If missing_ok is false it will error with the + * normal postgres error for unfound functions. + */ +const ObjectAddress * +CreateFunctionStmtObjectAddress(CreateFunctionStmt *stmt, bool missing_ok) +{ + ObjectType objectType = OBJECT_FUNCTION; + ObjectWithArgs *objectWithArgs = NULL; + ListCell *parameterCell = NULL; + +#if PG_VERSION_NUM > 110000 + if (stmt->is_procedure) + { + objectType = OBJECT_PROCEDURE; + } +#endif + + objectWithArgs = makeNode(ObjectWithArgs); + objectWithArgs->objname = stmt->funcname; + + foreach(parameterCell, stmt->parameters) + { + FunctionParameter *funcParam = castNode(FunctionParameter, lfirst(parameterCell)); + objectWithArgs->objargs = lappend(objectWithArgs->objargs, funcParam->argType); + } + + return FunctionToObjectAddress(objectType, objectWithArgs, missing_ok); +} + + /* * PlanAlterFunctionStmt is invoked for alter function statements with actions. Here we * plan the jobs to be executed on the workers for functions that have been distributed in diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index fc02c5084..3ed5eabfd 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -554,6 +554,12 @@ multi_ProcessUtility(PlannedStmt *pstmt, queryString); } + if (IsA(parsetree, CreateFunctionStmt)) + { + ddlJobs = PlanCreateFunctionStmt(castNode(CreateFunctionStmt, parsetree), + queryString); + } + /* * ALTER TABLE ALL IN TABLESPACE statements have their node type as * AlterTableMoveAllStmt. At the moment we do not support this functionality in @@ -706,6 +712,13 @@ multi_ProcessUtility(PlannedStmt *pstmt, { ProcessAlterEnumStmt(castNode(AlterEnumStmt, parsetree), queryString); } + + if (IsA(parsetree, CreateFunctionStmt)) + { + Assert(ddlJobs == NIL); /* jobs should not have been set before */ + ddlJobs = ProcessCreateFunctionStmt(castNode(CreateFunctionStmt, parsetree), + queryString); + } } /* diff --git a/src/backend/distributed/deparser/objectaddress.c b/src/backend/distributed/deparser/objectaddress.c index 267603a8a..9b69ccfa3 100644 --- a/src/backend/distributed/deparser/objectaddress.c +++ b/src/backend/distributed/deparser/objectaddress.c @@ -82,6 +82,12 @@ GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok) missing_ok); } + case T_CreateFunctionStmt: + { + return CreateFunctionStmtObjectAddress( + castNode(CreateFunctionStmt, parseTree), missing_ok); + } + default: { /* diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 41fb692af..427babfdb 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -51,6 +51,11 @@ extern bool ConstraintIsAForeignKey(char *constraintName, Oid relationId); /* function.c - forward declarations */ +extern List * PlanCreateFunctionStmt(CreateFunctionStmt *stmt, const char *queryString); +extern List * ProcessCreateFunctionStmt(CreateFunctionStmt *stmt, const + char *queryString); +extern const ObjectAddress * CreateFunctionStmtObjectAddress(CreateFunctionStmt *stmt, + bool missing_ok); extern List * PlanAlterFunctionStmt(AlterFunctionStmt *stmt, const char *queryString); extern const ObjectAddress * AlterFunctionStmtObjectAddress(AlterFunctionStmt *stmt, bool missing_ok); diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 0f883dc23..d0e100a95 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -288,6 +288,25 @@ SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER B (2 rows) ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; +-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator +CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer +AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 6 + localhost | 57638 | t | 6 +(2 rows) + DROP FUNCTION add(int,int); -- call should fail as function should have been dropped SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index eaf02faa5..3870e950c 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -149,6 +149,15 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2; ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; +-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator +CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer +AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + DROP FUNCTION add(int,int); -- call should fail as function should have been dropped SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;