diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 8b1dd4423..2c04a9cb0 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -26,22 +26,26 @@ #include "access/xact.h" #include "catalog/namespace.h" #include "catalog/pg_proc.h" -#include "distributed/commands.h" #include "catalog/pg_type.h" +#include "commands/extension.h" #include "distributed/colocation_utils.h" -#include "distributed/master_protocol.h" +#include "distributed/commands.h" +#include "distributed/commands/utility_hook.h" +#include "distributed/deparser.h" #include "distributed/maintenanced.h" -#include "distributed/metadata_sync.h" +#include "distributed/master_protocol.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata/pg_dist_object.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" #include "distributed/relation_access_tracking.h" #include "distributed/worker_transaction.h" #include "parser/parse_coerce.h" +#include "parser/parse_type.h" #include "storage/lmgr.h" #include "utils/builtins.h" -#include "utils/fmgrprotos.h" #include "utils/fmgroids.h" +#include "utils/fmgrprotos.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -62,10 +66,23 @@ static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, int *colocationId); static void EnsureSequentialModeForFunctionDDL(void); static void TriggerSyncMetadataToPrimaryNodes(void); +static bool ShouldPropagateAlterFunction(const ObjectAddress *address); +static ObjectAddress * FunctionToObjectAddress(ObjectType objectType, + ObjectWithArgs *objectWithArgs, + bool missing_ok); +static void ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt); PG_FUNCTION_INFO_V1(create_distributed_function); +#if PG_VERSION_NUM > 110000 +#define AssertIsFunctionOrProcedure(objtype) \ + Assert((objtype) == OBJECT_FUNCTION || (objtype) == OBJECT_PROCEDURE) +#else +#define AssertIsFunctionOrProcedure(objtype) \ + Assert(objtype == OBJECT_FUNCTION) +#endif + /* * create_distributed_function gets a function or procedure name with their list of @@ -612,3 +629,472 @@ TriggerSyncMetadataToPrimaryNodes(void) TriggerMetadataSync(MyDatabaseId); } } + + +/* + * ShouldPropagateAlterFunction returns, based on the address of a function, if alter + * statements targeting the function should be propagated. + */ +static bool +ShouldPropagateAlterFunction(const ObjectAddress *address) +{ + if (creating_extension) + { + /* + * extensions should be created separately on the workers, functions cascading + * from an extension should therefore not be propagated. + */ + return false; + } + + if (!EnableDependencyCreation) + { + /* + * we are configured to disable object propagation, should not propagate anything + */ + return false; + } + + if (!IsObjectDistributed(address)) + { + /* do not propagate alter function for non-distributed functions */ + return false; + } + + return true; +} + + +/* + * PlanAlterFunctionStmt is invoked for alter function statements with actions. Here we + * plan the jobs to be executed on the workers for functions that have been distributed in + * the cluster. + */ +List * +PlanAlterFunctionStmt(AlterFunctionStmt *stmt, const char *queryString) +{ + const char *sql = NULL; + const ObjectAddress *address = NULL; + List *commands = NIL; + + /* AlterFunctionStmt->objtype has only been added since pg11 */ +#if PG_VERSION_NUM > 110000 + AssertIsFunctionOrProcedure(stmt->objtype); +#endif + + address = GetObjectAddressFromParseTree((Node *) stmt, false); + if (!ShouldPropagateAlterFunction(address)) + { + return NIL; + } + + EnsureCoordinator(); + ErrorIfUnsupportedAlterFunctionStmt(stmt); + EnsureSequentialModeForFunctionDDL(); + QualifyTreeNode((Node *) stmt); + sql = DeparseTreeNode((Node *) stmt); + + commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(ALL_WORKERS, commands); +} + + +/* + * PlanRenameFunctionStmt is called when the user is renaming a function. The invocation + * happens before the statement is applied locally. + * + * As the function already exists we have access to the ObjectAddress, this is used to + * check if it is distributed. If so the rename is executed on all the workers to keep the + * types in sync across the cluster. + */ +List * +PlanRenameFunctionStmt(RenameStmt *stmt, const char *queryString) +{ + const char *sql = NULL; + const ObjectAddress *address = NULL; + List *commands = NIL; + + AssertIsFunctionOrProcedure(stmt->renameType); + + address = GetObjectAddressFromParseTree((Node *) stmt, false); + if (!ShouldPropagateAlterFunction(address)) + { + return NIL; + } + + EnsureCoordinator(); + EnsureSequentialModeForFunctionDDL(); + QualifyTreeNode((Node *) stmt); + sql = DeparseTreeNode((Node *) stmt); + + commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(ALL_WORKERS, commands); +} + + +/* + * PlanAlterFunctionSchemaStmt is executed before the statement is applied to the local + * postgres instance. + * + * In this stage we can prepare the commands that need to be run on all workers. + */ +List * +PlanAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString) +{ + const char *sql = NULL; + const ObjectAddress *address = NULL; + List *commands = NIL; + + AssertIsFunctionOrProcedure(stmt->objectType); + + address = GetObjectAddressFromParseTree((Node *) stmt, false); + if (!ShouldPropagateAlterFunction(address)) + { + return NIL; + } + + EnsureCoordinator(); + EnsureSequentialModeForFunctionDDL(); + QualifyTreeNode((Node *) stmt); + sql = DeparseTreeNode((Node *) stmt); + + commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(ALL_WORKERS, commands); +} + + +/* + * PlanAlterTypeOwnerStmt is called for change of owner ship of functions before the owner + * ship is changed on the local instance. + * + * If the function for which the owner is changed is distributed we execute the change on + * all the workers to keep the type in sync across the cluster. + */ +List * +PlanAlterFunctionOwnerStmt(AlterOwnerStmt *stmt, const char *queryString) +{ + const ObjectAddress *address = NULL; + const char *sql = NULL; + List *commands = NULL; + + AssertIsFunctionOrProcedure(stmt->objectType); + + address = GetObjectAddressFromParseTree((Node *) stmt, false); + if (!ShouldPropagateAlterFunction(address)) + { + return NIL; + } + + EnsureCoordinator(); + EnsureSequentialModeForFunctionDDL(); + QualifyTreeNode((Node *) stmt); + sql = DeparseTreeNode((Node *) stmt); + + commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(ALL_WORKERS, commands); +} + + +/* + * PlanDropFunctionStmt gets called during the planning phase of a DROP FUNCTION statement + * and returns a list of DDLJob's that will drop any distributed functions from the + * workers. + * + * The DropStmt could have multiple objects to drop, the list of objects will be filtered + * to only keep the distributed functions for deletion on the workers. Non-distributed + * functions will still be dropped locally but not on the workers. + */ +List * +PlanDropFunctionStmt(DropStmt *stmt, const char *queryString) +{ + List *deletingObjectWithArgsList = stmt->objects; + List *distributedObjectWithArgsList = NIL; + List *distributedFunctionAddresses = NIL; + ListCell *addressCell = NULL; + const char *dropStmtSql = NULL; + List *commands = NULL; + ListCell *objectWithArgsListCell = NULL; + DropStmt *stmtCopy = NULL; + + AssertIsFunctionOrProcedure(stmt->removeType); + + if (creating_extension) + { + /* + * extensions should be created separately on the workers, types cascading from an + * extension should therefor not be propagated here. + */ + return NIL; + } + + if (!EnableDependencyCreation) + { + /* + * we are configured to disable object propagation, should not propagate anything + */ + return NIL; + } + + + /* + * Our statements need to be fully qualified so we can drop them from the right schema + * on the workers + */ + QualifyTreeNode((Node *) stmt); + + /* + * iterate over all functions to be dropped and filter to keep only distributed + * functions. + */ + foreach(objectWithArgsListCell, deletingObjectWithArgsList) + { + ObjectWithArgs *func = NULL; + ObjectAddress *address = NULL; + + func = castNode(ObjectWithArgs, lfirst(objectWithArgsListCell)); + address = FunctionToObjectAddress(stmt->removeType, func, stmt->missing_ok); + + if (!IsObjectDistributed(address)) + { + continue; + } + + /* collect information for all distributed functions */ + distributedFunctionAddresses = lappend(distributedFunctionAddresses, address); + distributedObjectWithArgsList = lappend(distributedObjectWithArgsList, func); + } + + if (list_length(distributedObjectWithArgsList) <= 0) + { + /* no distributed functions to drop */ + return NIL; + } + + /* + * managing types can only be done on the coordinator if ddl propagation is on. when + * it is off we will never get here. MX workers don't have a notion of distributed + * types, so we block the call. + */ + EnsureCoordinator(); + EnsureSequentialModeForFunctionDDL(); + + /* remove the entries for the distributed objects on dropping */ + foreach(addressCell, distributedFunctionAddresses) + { + ObjectAddress *address = (ObjectAddress *) lfirst(addressCell); + UnmarkObjectDistributed(address); + } + + /* + * Swap the list of objects before deparsing and restore the old list after. This + * ensures we only have distributed functions in the deparsed drop statement. + */ + stmtCopy = copyObject(stmt); + stmtCopy->objects = distributedObjectWithArgsList; + dropStmtSql = DeparseTreeNode((Node *) stmtCopy); + + commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) dropStmtSql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(ALL_WORKERS, commands); +} + + +/* + * ProcessAlterFunctionSchemaStmt is executed after the change has been applied locally, + * we can now use the new dependencies of the function to ensure all its dependencies + * exist on the workers before we apply the commands remotely. + */ +void +ProcessAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString) +{ + const ObjectAddress *address = NULL; + + AssertIsFunctionOrProcedure(stmt->objectType); + + address = GetObjectAddressFromParseTree((Node *) stmt, false); + if (!ShouldPropagateAlterFunction(address)) + { + return; + } + + /* dependencies have changed (schema) lets ensure they exist */ + EnsureDependenciesExistsOnAllNodes(address); +} + + +/* + * AlterFunctionStmtObjectAddress returns the ObjectAddress of the subject in the + * AlterFunctionStmt. If missing_ok is set to false an error will be raised if postgres + * was unable to find the function/procedure that was the target of the statement. + */ +const ObjectAddress * +AlterFunctionStmtObjectAddress(AlterFunctionStmt *stmt, bool missing_ok) +{ + ObjectType objectType = OBJECT_FUNCTION; + +#if PG_VERSION_NUM > 110000 + objectType = stmt->objtype; +#endif + + return FunctionToObjectAddress(objectType, stmt->func, missing_ok); +} + + +/* + * RenameFunctionStmtObjectAddress returns the ObjectAddress of the function that is the + * subject of the RenameStmt. Errors if missing_ok is false. + */ +const ObjectAddress * +RenameFunctionStmtObjectAddress(RenameStmt *stmt, bool missing_ok) +{ + return FunctionToObjectAddress(stmt->renameType, + castNode(ObjectWithArgs, stmt->object), missing_ok); +} + + +/* + * AlterFunctionOwnerObjectAddress returns the ObjectAddress of the function that is the + * subject of the AlterOwnerStmt. Errors if missing_ok is false. + */ +const ObjectAddress * +AlterFunctionOwnerObjectAddress(AlterOwnerStmt *stmt, bool missing_ok) +{ + return FunctionToObjectAddress(stmt->objectType, + castNode(ObjectWithArgs, stmt->object), missing_ok); +} + + +/* + * AlterFunctionSchemaStmtObjectAddress returns the ObjectAddress of the function that is + * the subject of the AlterObjectSchemaStmt. Errors if missing_ok is false. + * + * This could be called both before or after it has been applied locally. It will look in + * the old schema first, if the function cannot be found in that schema it will look in + * the new schema. Errors if missing_ok is false and the type cannot be found in either of + * the schemas. + */ +const ObjectAddress * +AlterFunctionSchemaStmtObjectAddress(AlterObjectSchemaStmt *stmt, bool missing_ok) +{ + ObjectWithArgs *objectWithArgs = NULL; + Oid funcOid = InvalidOid; + List *names = NIL; + ObjectAddress *address = NULL; + + AssertIsFunctionOrProcedure(stmt->objectType); + + objectWithArgs = castNode(ObjectWithArgs, stmt->object); + funcOid = LookupFuncWithArgsCompat(stmt->objectType, objectWithArgs, true); + names = objectWithArgs->objname; + + if (funcOid == InvalidOid) + { + /* + * couldn't find the function, might have already been moved to the new schema, we + * construct a new objname that uses the new schema to search in. + */ + + /* the name of the function is the last in the list of names */ + Value *funcNameStr = lfirst(list_tail(names)); + List *newNames = list_make2(makeString(stmt->newschema), funcNameStr); + + /* + * we don't error here either, as the error would be not a good user facing + * error if the type didn't exist in the first place. + */ + objectWithArgs->objname = newNames; + funcOid = LookupFuncWithArgsCompat(stmt->objectType, objectWithArgs, true); + objectWithArgs->objname = names; /* restore the original names */ + + /* + * if the function is still invalid we couldn't find the function, cause postgres + * to error by preforming a lookup once more. Since we know the + */ + if (!missing_ok && funcOid == InvalidOid) + { + /* + * this will most probably throw an error, unless for some reason the function + * has just been created (if possible at all). For safety we assign the + * funcOid. + */ + funcOid = LookupFuncWithArgsCompat(stmt->objectType, objectWithArgs, + missing_ok); + } + } + + address = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*address, ProcedureRelationId, funcOid); + + return address; +} + + +/* + * FunctionToObjectAddress returns the ObjectAddress of a Function or Procedure based on + * its type and ObjectWithArgs describing the Function/Procedure. If missing_ok is set to + * false an error will be raised by postgres explaining the Function/Procedure could not + * be found. + */ +static ObjectAddress * +FunctionToObjectAddress(ObjectType objectType, ObjectWithArgs *objectWithArgs, + bool missing_ok) +{ + Oid funcOid = InvalidOid; + ObjectAddress *address = NULL; + + AssertIsFunctionOrProcedure(objectType); + + funcOid = LookupFuncWithArgsCompat(objectType, objectWithArgs, missing_ok); + address = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*address, ProcedureRelationId, funcOid); + + return address; +} + + +/* + * ErrorIfUnsupportedAlterFunctionStmt raises an error if the AlterFunctionStmt contains a + * construct that is not supported to be altered on a distributed function. It is assumed + * the statement passed in is already tested to be targeting a distributed function, and + * will only execute the checks to error on unsupported constructs. + * + * Unsupported Constructs: + * - ALTER FUNCTION ... SET ... FROM CURRENT + */ +static void +ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt) +{ + ListCell *actionCell = NULL; + + foreach(actionCell, stmt->actions) + { + DefElem *action = castNode(DefElem, lfirst(actionCell)); + if (strcmp(action->defname, "set") == 0) + { + VariableSetStmt *setStmt = castNode(VariableSetStmt, action->arg); + if (setStmt->kind == VAR_SET_CURRENT) + { + /* check if the set action is a SET ... FROM CURRENT */ + ereport(ERROR, (errmsg("unsupported ALTER FUNCTION ... SET ... FROM " + "CURRENT for a distributed function"), + errhint("SET FROM CURRENT is not supported for " + "distributed functions, instead use the SET ... " + "TO ... syntax with a constant value."))); + } + } + } +} diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index 72dbd195b..9c287f59f 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -121,6 +121,14 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryString) return PlanAlterTypeSchemaStmt(stmt, queryString); } +#if PG_VERSION_NUM > 110000 + case OBJECT_PROCEDURE: +#endif + case OBJECT_FUNCTION: + { + return PlanAlterFunctionSchemaStmt(stmt, queryString); + } + default: { /* do nothing for unsupported objects */ @@ -190,6 +198,15 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, const char *queryStrin return; } +#if PG_VERSION_NUM > 110000 + case OBJECT_PROCEDURE: +#endif + case OBJECT_FUNCTION: + { + ProcessAlterFunctionSchemaStmt(stmt, queryString); + return; + } + default: { /* do nothing for unsupported objects */ diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 63986a6a1..fc02c5084 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -424,6 +424,14 @@ multi_ProcessUtility(PlannedStmt *pstmt, break; } +#if PG_VERSION_NUM > 110000 + case OBJECT_PROCEDURE: +#endif + case OBJECT_FUNCTION: + { + ddlJobs = PlanDropFunctionStmt(dropStatement, queryString); + } + default: { /* unsupported type, skipping*/ @@ -474,6 +482,15 @@ multi_ProcessUtility(PlannedStmt *pstmt, break; } +#if PG_VERSION_NUM > 110000 + case OBJECT_PROCEDURE: +#endif + case OBJECT_FUNCTION: + { + ddlJobs = PlanRenameFunctionStmt(renameStmt, queryString); + break; + } + default: { ddlJobs = PlanRenameStmt(renameStmt, queryString); @@ -531,6 +548,12 @@ multi_ProcessUtility(PlannedStmt *pstmt, queryString); } + if (IsA(parsetree, AlterFunctionStmt)) + { + ddlJobs = PlanAlterFunctionStmt(castNode(AlterFunctionStmt, parsetree), + queryString); + } + /* * ALTER TABLE ALL IN TABLESPACE statements have their node type as * AlterTableMoveAllStmt. At the moment we do not support this functionality in @@ -799,6 +822,14 @@ PlanAlterOwnerStmt(AlterOwnerStmt *stmt, const char *queryString) return PlanAlterTypeOwnerStmt(stmt, queryString); } +#if PG_VERSION_NUM > 110000 + case OBJECT_PROCEDURE: +#endif + case OBJECT_FUNCTION: + { + return PlanAlterFunctionOwnerStmt(stmt, queryString); + } + default: { /* do nothing for unsupported alter owner statements */ diff --git a/src/backend/distributed/deparser/objectaddress.c b/src/backend/distributed/deparser/objectaddress.c index 40719fc81..267603a8a 100644 --- a/src/backend/distributed/deparser/objectaddress.c +++ b/src/backend/distributed/deparser/objectaddress.c @@ -76,6 +76,12 @@ GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok) missing_ok); } + case T_AlterFunctionStmt: + { + return AlterFunctionStmtObjectAddress(castNode(AlterFunctionStmt, parseTree), + missing_ok); + } + default: { /* @@ -123,6 +129,14 @@ RenameStmtObjectAddress(RenameStmt *stmt, bool missing_ok) return RenameAttributeStmtObjectAddress(stmt, missing_ok); } +#if PG_VERSION_NUM > 110000 + case OBJECT_PROCEDURE: +#endif + case OBJECT_FUNCTION: + { + return RenameFunctionStmtObjectAddress(stmt, missing_ok); + } + default: { ereport(ERROR, (errmsg("unsupported rename statement to get object address " @@ -142,6 +156,14 @@ AlterObjectSchemaStmtObjectAddress(AlterObjectSchemaStmt *stmt, bool missing_ok) return AlterTypeSchemaStmtObjectAddress(stmt, missing_ok); } +#if PG_VERSION_NUM > 110000 + case OBJECT_PROCEDURE: +#endif + case OBJECT_FUNCTION: + { + return AlterFunctionSchemaStmtObjectAddress(stmt, missing_ok); + } + default: { ereport(ERROR, (errmsg("unsupported alter schema statement to get object " @@ -182,6 +204,14 @@ AlterOwnerStmtObjectAddress(AlterOwnerStmt *stmt, bool missing_ok) return AlterTypeOwnerObjectAddress(stmt, missing_ok); } +#if PG_VERSION_NUM > 110000 + case OBJECT_PROCEDURE: +#endif + case OBJECT_FUNCTION: + { + return AlterFunctionOwnerObjectAddress(stmt, missing_ok); + } + default: { ereport(ERROR, (errmsg("unsupported alter owner statement to get object " diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index da4c93b85..41fb692af 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -50,6 +50,25 @@ extern bool TableReferencing(Oid relationId); extern bool ConstraintIsAForeignKey(char *constraintName, Oid relationId); +/* function.c - forward declarations */ +extern List * PlanAlterFunctionStmt(AlterFunctionStmt *stmt, const char *queryString); +extern const ObjectAddress * AlterFunctionStmtObjectAddress(AlterFunctionStmt *stmt, + bool missing_ok); +extern List * PlanRenameFunctionStmt(RenameStmt *stmt, const char *queryString); +extern const ObjectAddress * RenameFunctionStmtObjectAddress(RenameStmt *stmt, + bool missing_ok); +extern List * PlanAlterFunctionOwnerStmt(AlterOwnerStmt *stmt, const char *queryString); +extern const ObjectAddress * AlterFunctionOwnerObjectAddress(AlterOwnerStmt *stmt, + bool missing_ok); +extern List * PlanAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt, + const char *queryString); +extern const ObjectAddress * AlterFunctionSchemaStmtObjectAddress( + AlterObjectSchemaStmt *stmt, bool missing_ok); +extern void ProcessAlterFunctionSchemaStmt(AlterObjectSchemaStmt *stmt, + const char *queryString); +extern List * PlanDropFunctionStmt(DropStmt *stmt, const char *queryString); + + /* grant.c - forward declarations */ extern List * PlanGrantStmt(GrantStmt *grantStmt); diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index ec0f81d73..0f883dc23 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -10,6 +10,7 @@ SELECT run_command_on_workers($$CREATE USER functionuser;$$); (2 rows) CREATE SCHEMA function_tests AUTHORIZATION functionuser; +CREATE SCHEMA function_tests2 AUTHORIZATION functionuser; SET search_path TO function_tests; SET citus.shard_count TO 4; -- Create and distribute a simple function @@ -56,6 +57,17 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; +-- make sure to propagate ddl propagation after we have setup our functions, this will +-- allow alter statements to be propagated and keep the functions in sync across machines +SET citus.enable_ddl_propagation TO on; +-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists. +CREATE TABLE colocation_table(id int); +SELECT create_distributed_table('colocation_table','id'); + create_distributed_table +-------------------------- + +(1 row) + -- make sure that none of the active and primary nodes hasmetadata -- at the start of the test select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; @@ -121,6 +133,170 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY localhost | 57638 | t | 5 (2 rows) +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +-- testing alter statements for a distributed function +-- ROWS 5, untested because; +-- ERROR: ROWS is not applicable when function does not return a set +ALTER FUNCTION add(int,int) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +ALTER FUNCTION add(int,int) STRICT VOLATILE PARALLEL SAFE; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +-- Test SET/RESET for alter function +ALTER FUNCTION add(int,int) SET client_min_messages TO warning; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +ALTER FUNCTION add(int,int) SET client_min_messages TO error; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +ALTER FUNCTION add(int,int) SET client_min_messages TO debug; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +ALTER FUNCTION add(int,int) RESET client_min_messages; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +-- SET ... FROM CURRENT is not supported, verify the query fails with a descriptive error irregardless of where in the action list the statement occurs +ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT; +ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function +HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value. +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT; +ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function +HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value. +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT SECURITY DEFINER; +ERROR: unsupported ALTER FUNCTION ... SET ... FROM CURRENT for a distributed function +HINT: SET FROM CURRENT is not supported for distributed functions, instead use the SET ... TO ... syntax with a constant value. +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +-- rename function and make sure the new name can be used on the workers while the old name can't +ALTER FUNCTION add(int,int) RENAME TO add2; +SELECT public.verify_function_is_same_on_workers('function_tests.add2(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+---------------------------------------------------------------------- + localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist + localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist +(2 rows) + +SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 5 + localhost | 57638 | t | 5 +(2 rows) + +ALTER FUNCTION add2(int,int) RENAME TO add; +-- change the owner of the function and verify the owner has been changed on the workers +ALTER FUNCTION add(int,int) OWNER TO functionuser; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +SELECT run_command_on_workers($$ +SELECT row(usename, nspname, proname) +FROM pg_proc +JOIN pg_user ON (usesysid = proowner) +JOIN pg_namespace ON (pg_namespace.oid = pronamespace) +WHERE proname = 'add'; +$$); + run_command_on_workers +--------------------------------------------------------- + (localhost,57637,t,"(functionuser,function_tests,add)") + (localhost,57638,t,"(functionuser,function_tests,add)") +(2 rows) + +-- change the schema of the function and verify the old schema doesn't exist anymore while +-- the new schema has the function. +ALTER FUNCTION add(int,int) SET SCHEMA function_tests2; +SELECT public.verify_function_is_same_on_workers('function_tests2.add(int,int)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+---------------------------------------------------------------------- + localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist + localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist +(2 rows) + +SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 5 + localhost | 57638 | t | 5 +(2 rows) + +ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; +DROP FUNCTION add(int,int); +-- call should fail as function should have been dropped +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+---------------------------------------------------------------------- + localhost | 57637 | f | ERROR: function function_tests.add(integer, integer) does not exist + localhost | 57638 | f | ERROR: function function_tests.add(integer, integer) does not exist +(2 rows) + -- postgres doesn't accept parameter names in the regprocedure input SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); ERROR: syntax error at or near "int" @@ -371,6 +547,7 @@ SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isa SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; +DROP SCHEMA function_tests2 CASCADE; -- This is hacky, but we should clean-up the resources as below \c - - - :worker_1_port SET client_min_messages TO error; -- suppress cascading objects dropping @@ -385,6 +562,7 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition TRUNCATE pg_dist_node; DROP SCHEMA function_tests CASCADE; +DROP SCHEMA function_tests2 CASCADE; \c - - - :worker_2_port SET client_min_messages TO error; -- suppress cascading objects dropping UPDATE pg_dist_local_group SET groupid = 0; @@ -398,6 +576,7 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition TRUNCATE pg_dist_node; DROP SCHEMA function_tests CASCADE; +DROP SCHEMA function_tests2 CASCADE; \c - - - :master_port DROP USER functionuser; SELECT run_command_on_workers($$DROP USER functionuser;$$); diff --git a/src/test/regress/expected/distributed_procedure.out b/src/test/regress/expected/distributed_procedure.out new file mode 100644 index 000000000..673fc2df6 --- /dev/null +++ b/src/test/regress/expected/distributed_procedure.out @@ -0,0 +1,204 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above + \gset +\if :server_verion_eleven_and_above +\else +\q +\endif +SET citus.next_shard_id TO 20030000; +CREATE USER procedureuser; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +SELECT run_command_on_workers($$CREATE USER procedureuser;$$); + run_command_on_workers +----------------------------------- + (localhost,57637,t,"CREATE ROLE") + (localhost,57638,t,"CREATE ROLE") +(2 rows) + +CREATE SCHEMA procedure_tests AUTHORIZATION procedureuser; +CREATE SCHEMA procedure_tests2 AUTHORIZATION procedureuser; +SET search_path TO procedure_tests; +SET citus.shard_count TO 4; +-- Create and distribute a simple function +CREATE OR REPLACE PROCEDURE raise_info(text) + LANGUAGE PLPGSQL AS $proc$ +BEGIN + RAISE INFO 'information message %', $1; +END; +$proc$; +-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. +CREATE TABLE colocation_table(id text); +SELECT create_distributed_table('colocation_table','id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_function('raise_info(text)', '$1'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | CALL + localhost | 57638 | t | CALL +(2 rows) + +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +-- testing alter statements for a distributed function +-- ROWS 5, untested because; +-- ERROR: ROWS is not applicable when function does not return a set +ALTER PROCEDURE raise_info(text) SECURITY INVOKER; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +ALTER PROCEDURE raise_info(text) SECURITY DEFINER; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +-- Test SET/RESET for alter procedure +ALTER PROCEDURE raise_info(text) SET client_min_messages TO warning; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +ALTER PROCEDURE raise_info(text) SET client_min_messages TO error; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +ALTER PROCEDURE raise_info(text) SET client_min_messages TO debug; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +ALTER PROCEDURE raise_info(text) RESET client_min_messages; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +-- rename function and make sure the new name can be used on the workers while the old name can't +ALTER PROCEDURE raise_info(text) RENAME TO raise_info2; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info2(text)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+---------------------------------------------------------------------- + localhost | 57637 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist + localhost | 57638 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist +(2 rows) + +SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info2('hello');$$) ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | CALL + localhost | 57638 | t | CALL +(2 rows) + +ALTER PROCEDURE raise_info2(text) RENAME TO raise_info; +-- change the owner of the function and verify the owner has been changed on the workers +ALTER PROCEDURE raise_info(text) OWNER TO procedureuser; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +SELECT run_command_on_workers($$ +SELECT row(usename, nspname, proname) +FROM pg_proc +JOIN pg_user ON (usesysid = proowner) +JOIN pg_namespace ON (pg_namespace.oid = pronamespace) +WHERE proname = 'raise_info'; +$$); + run_command_on_workers +------------------------------------------------------------------ + (localhost,57637,t,"(procedureuser,procedure_tests,raise_info)") + (localhost,57638,t,"(procedureuser,procedure_tests,raise_info)") +(2 rows) + +-- change the schema of the procedure and verify the old schema doesn't exist anymore while +-- the new schema has the function. +ALTER PROCEDURE raise_info(text) SET SCHEMA procedure_tests2; +SELECT public.verify_function_is_same_on_workers('procedure_tests2.raise_info(text)'); + verify_function_is_same_on_workers +------------------------------------ + t +(1 row) + +SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+---------------------------------------------------------------------- + localhost | 57637 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist + localhost | 57638 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist +(2 rows) + +SELECT * FROM run_command_on_workers($$CALL procedure_tests2.raise_info('hello');$$) ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | CALL + localhost | 57638 | t | CALL +(2 rows) + +ALTER PROCEDURE procedure_tests2.raise_info(text) SET SCHEMA procedure_tests; +DROP PROCEDURE raise_info(text); +-- call should fail as procedure should have been dropped +SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+---------------------------------------------------------------------- + localhost | 57637 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist + localhost | 57638 | f | ERROR: procedure procedure_tests.raise_info(unknown) does not exist +(2 rows) + +SET client_min_messages TO error; -- suppress cascading objects dropping +DROP SCHEMA procedure_tests CASCADE; +SELECT run_command_on_workers($$DROP SCHEMA procedure_tests CASCADE;$$); + run_command_on_workers +----------------------------------- + (localhost,57637,t,"DROP SCHEMA") + (localhost,57638,t,"DROP SCHEMA") +(2 rows) + +DROP SCHEMA procedure_tests2 CASCADE; +SELECT run_command_on_workers($$DROP SCHEMA procedure_tests2 CASCADE;$$); + run_command_on_workers +----------------------------------- + (localhost,57637,t,"DROP SCHEMA") + (localhost,57638,t,"DROP SCHEMA") +(2 rows) + +DROP USER procedureuser; +SELECT run_command_on_workers($$DROP USER procedureuser;$$); + run_command_on_workers +--------------------------------- + (localhost,57637,t,"DROP ROLE") + (localhost,57638,t,"DROP ROLE") +(2 rows) + diff --git a/src/test/regress/expected/distributed_procedure_0.out b/src/test/regress/expected/distributed_procedure_0.out new file mode 100644 index 000000000..a5ea9fa80 --- /dev/null +++ b/src/test/regress/expected/distributed_procedure_0.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above + \gset +\if :server_verion_eleven_and_above +\else +\q diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 26d41b18f..8cd8ffa15 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -276 275 f +274 273 f transactionnumberwaitingtransactionnumbers -275 -276 275 +273 +274 273 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -280 279 f -281 279 f -281 280 t +278 277 f +279 277 f +279 278 t transactionnumberwaitingtransactionnumbers -279 -280 279 -281 279,280 +277 +278 277 +279 277,278 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_ensure_dependency_activate_node.out b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out index c8da37112..a1f4af81e 100644 --- a/src/test/regress/expected/isolation_ensure_dependency_activate_node.out +++ b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out @@ -1571,7 +1571,7 @@ master_remove_node -starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-distribute-function s1-commit s2-begin s2-commit s3-wait-for-metadata-sync s1-print-distributed-objects +starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-distribute-function s1-commit s2-begin s2-commit s3-wait-for-metadata-sync s2-print-distributed-objects ?column? 1 @@ -1659,9 +1659,7 @@ step s3-wait-for-metadata-sync: wait_until_metadata_sync -step s1-print-distributed-objects: - SELECT 1 FROM master_add_node('localhost', 57638); - +step s2-print-distributed-objects: -- print an overview of all distributed objects SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1; @@ -1677,11 +1675,6 @@ step s1-print-distributed-objects: SELECT count(*) FROM pg_proc WHERE proname='add'; SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add';$$); - SELECT master_remove_node('localhost', 57638); - -?column? - -1 pg_identify_object_as_address (function,"{public,add}","{integer,integer}") @@ -1709,9 +1702,6 @@ run_command_on_workers master_remove_node -master_remove_node - - starting permutation: s1-print-distributed-objects s1-begin s2-public-schema s2-distribute-function s2-begin s2-commit s3-wait-for-metadata-sync s1-add-worker s1-commit s3-wait-for-metadata-sync s2-print-distributed-objects diff --git a/src/test/regress/expected/multi_deparse_function.out b/src/test/regress/expected/multi_deparse_function.out index 6a2d10728..85d72fbe7 100644 --- a/src/test/regress/expected/multi_deparse_function.out +++ b/src/test/regress/expected/multi_deparse_function.out @@ -31,6 +31,7 @@ -- DROP FUNCTION [ IF EXISTS ] name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ] [, ...] -- [ CASCADE | RESTRICT ] SET citus.next_shard_id TO 20020000; +SET citus.enable_ddl_propagation TO off; CREATE SCHEMA function_tests; SET search_path TO function_tests; SET citus.shard_count TO 4; diff --git a/src/test/regress/expected/multi_deparse_procedure.out b/src/test/regress/expected/multi_deparse_procedure.out index 355eac644..d17865bc1 100644 --- a/src/test/regress/expected/multi_deparse_procedure.out +++ b/src/test/regress/expected/multi_deparse_procedure.out @@ -1,3 +1,10 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above + \gset +\if :server_verion_eleven_and_above +\else +\q +\endif -- -- Regression tests for deparsing ALTER/DROP PROCEDURE Queries -- @@ -26,6 +33,7 @@ -- For example CALLED ON NULL INPUT action is valid only for FUNCTIONS, but we still -- allow deparsing them here. SET citus.next_shard_id TO 20030000; +SET citus.enable_ddl_propagation TO off; CREATE SCHEMA procedure_tests; SET search_path TO procedure_tests; SET citus.shard_count TO 4; @@ -288,8 +296,9 @@ $cmd$); (localhost,57638,t,"ALTER PROCEDURE") (2 rows) +-- rename and rename back to keep the nodes in sync SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info RENAME TO summation +ALTER PROCEDURE raise_info RENAME TO summation; $cmd$); deparse_and_run_on_workers --------------------------------------- @@ -297,16 +306,34 @@ $cmd$); (localhost,57638,t,"ALTER PROCEDURE") (2 rows) -CREATE ROLE PROCEDURE_role; +ALTER PROCEDURE raise_info RENAME TO summation; +SELECT deparse_and_run_on_workers($cmd$ +ALTER PROCEDURE summation RENAME TO raise_info; +$cmd$); + deparse_and_run_on_workers +--------------------------------------- + (localhost,57637,t,"ALTER PROCEDURE") + (localhost,57638,t,"ALTER PROCEDURE") +(2 rows) + +ALTER PROCEDURE summation RENAME TO raise_info; +CREATE ROLE procedure_role; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes HINT: Connect to worker nodes directly to manually create all necessary users and roles. +SELECT run_command_on_workers($$CREATE ROLE procedure_role;$$); + run_command_on_workers +----------------------------------- + (localhost,57637,t,"CREATE ROLE") + (localhost,57638,t,"CREATE ROLE") +(2 rows) + SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info OWNER TO PROCEDURE_role +ALTER PROCEDURE raise_info OWNER TO procedure_role $cmd$); - deparse_and_run_on_workers ----------------------------------------------------------------------- - (localhost,57637,f,"ERROR: role ""procedure_role"" does not exist") - (localhost,57638,f,"ERROR: role ""procedure_role"" does not exist") + deparse_and_run_on_workers +--------------------------------------- + (localhost,57637,t,"ALTER PROCEDURE") + (localhost,57638,t,"ALTER PROCEDURE") (2 rows) SELECT deparse_and_run_on_workers($cmd$ @@ -318,26 +345,38 @@ $cmd$); (localhost,57638,f,"ERROR: role ""missing_role"" does not exist") (2 rows) +-- move schema and back to keep the nodes in sync SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info SET SCHEMA public +ALTER PROCEDURE raise_info SET SCHEMA public; $cmd$); - deparse_and_run_on_workers ------------------------------------------------------------------------------------------ - (localhost,57637,f,"ERROR: procedure procedure_tests.raise_info(text) does not exist") - (localhost,57638,f,"ERROR: procedure procedure_tests.raise_info(text) does not exist") + deparse_and_run_on_workers +--------------------------------------- + (localhost,57637,t,"ALTER PROCEDURE") + (localhost,57638,t,"ALTER PROCEDURE") (2 rows) +ALTER PROCEDURE raise_info SET SCHEMA public; +SELECT deparse_and_run_on_workers($cmd$ +ALTER PROCEDURE public.raise_info SET SCHEMA procedure_tests; +$cmd$); + deparse_and_run_on_workers +--------------------------------------- + (localhost,57637,t,"ALTER PROCEDURE") + (localhost,57638,t,"ALTER PROCEDURE") +(2 rows) + +ALTER PROCEDURE public.raise_info SET SCHEMA procedure_tests; SELECT deparse_and_run_on_workers($cmd$ ALTER PROCEDURE raise_info DEPENDS ON EXTENSION citus $cmd$); - deparse_and_run_on_workers ------------------------------------------------------------------------------------------ - (localhost,57637,f,"ERROR: procedure procedure_tests.raise_info(text) does not exist") - (localhost,57638,f,"ERROR: procedure procedure_tests.raise_info(text) does not exist") + deparse_and_run_on_workers +--------------------------------------- + (localhost,57637,t,"ALTER PROCEDURE") + (localhost,57638,t,"ALTER PROCEDURE") (2 rows) SELECT deparse_and_run_on_workers($cmd$ -DROP PROCEDURE IF EXISTS raise_info(int,int); +DROP PROCEDURE raise_info(text); $cmd$); deparse_and_run_on_workers -------------------------------------- @@ -376,4 +415,11 @@ $cmd$); -- clear objects SET client_min_messages TO WARNING; -- suppress cascading objects dropping DROP SCHEMA procedure_tests CASCADE; -DROP ROLE PROCEDURE_role; +DROP ROLE procedure_role; +SELECT run_command_on_workers($$DROP ROLE procedure_role;$$); + run_command_on_workers +--------------------------------- + (localhost,57637,t,"DROP ROLE") + (localhost,57638,t,"DROP ROLE") +(2 rows) + diff --git a/src/test/regress/expected/multi_deparse_procedure_0.out b/src/test/regress/expected/multi_deparse_procedure_0.out index 7ce060636..a5ea9fa80 100644 --- a/src/test/regress/expected/multi_deparse_procedure_0.out +++ b/src/test/regress/expected/multi_deparse_procedure_0.out @@ -1,448 +1,6 @@ --- --- Regression tests for deparsing ALTER/DROP PROCEDURE Queries --- --- ALTER PROCEDURE name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ] --- action [ ... ] [ RESTRICT ] --- ALTER PROCEDURE name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ] --- RENAME TO new_name --- ALTER PROCEDURE name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ] --- OWNER TO { new_owner | CURRENT_USER | SESSION_USER } --- ALTER PROCEDURE name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ] --- SET SCHEMA new_schema --- ALTER PROCEDURE name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ] --- DEPENDS ON EXTENSION extension_name --- where action is one of: --- [ EXTERNAL ] SECURITY INVOKER | [ EXTERNAL ] SECURITY DEFINER --- SET configuration_parameter { TO | = } { value | DEFAULT } --- SET configuration_parameter FROM CURRENT --- RESET configuration_parameter --- RESET ALL --- --- DROP PROCEDURE [ IF EXISTS ] name [ ( [ [ argmode ] [ argname ] argtype [, ...] ] ) ] [, ...] --- [ CASCADE | RESTRICT ] --- --- Please note that current deparser does not return errors on some invalid queries. --- --- For example CALLED ON NULL INPUT action is valid only for FUNCTIONS, but we still --- allow deparsing them here. -SET citus.next_shard_id TO 20030000; -CREATE SCHEMA procedure_tests; -SET search_path TO procedure_tests; -SET citus.shard_count TO 4; -SET client_min_messages TO INFO; --- print whether we're using version > 10 to make version-specific tests clear SHOW server_version \gset -SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten; - version_above_ten -------------------- - f -(1 row) - -CREATE FUNCTION deparse_test(text) - RETURNS text - AS 'citus' - LANGUAGE C STRICT; -CREATE FUNCTION deparse_and_run_on_workers(text) - RETURNS SETOF record - AS $fnc$ - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - $fnc$ - LANGUAGE SQL; --- Create a simple PROCEDURE and distribute it -CREATE OR REPLACE PROCEDURE raise_info(text) -LANGUAGE PLPGSQL AS $proc$ -BEGIN - RAISE INFO 'information message %', $1; -END; -$proc$; -ERROR: syntax error at or near "PROCEDURE" -LINE 1: CREATE OR REPLACE PROCEDURE raise_info(text) - ^ -SELECT create_distributed_function('raise_info(text)'); -ERROR: function "raise_info(text)" does not exist -LINE 1: SELECT create_distributed_function('raise_info(text)'); - ^ -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info CALLED ON NULL INPUT -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info RETURNS NULL ON NULL INPUT -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info STRICT -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info IMMUTABLE -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info STABLE -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info VOLATILE -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info LEAKPROOF -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info NOT LEAKPROOF -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info EXTERNAL SECURITY INVOKER -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info SECURITY INVOKER -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info EXTERNAL SECURITY DEFINER -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info SECURITY DEFINER -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info PARALLEL UNSAFE -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info PARALLEL RESTRICTED -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info PARALLEL SAFE -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 --- The COST/ROWS arguments should always be numeric -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info COST 1234 -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info COST 1234.5 -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info ROWS 10 -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info ROWS 10.8 -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info SECURITY INVOKER SET client_min_messages TO warning; -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info SET log_min_messages = ERROR -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info SET log_min_messages TO DEFAULT -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info SET log_min_messages FROM CURRENT -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info RESET log_min_messages -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info RESET ALL -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info RENAME TO summation -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -CREATE ROLE PROCEDURE_role; -NOTICE: not propagating CREATE ROLE/USER commands to worker nodes -HINT: Connect to worker nodes directly to manually create all necessary users and roles. -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info OWNER TO PROCEDURE_role -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info OWNER TO missing_role -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info SET SCHEMA public -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info DEPENDS ON EXTENSION citus -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -DROP PROCEDURE IF EXISTS raise_info(int,int); -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 --- Check that an invalid PROCEDURE name is still parsed correctly -SELECT deparse_and_run_on_workers($cmd$ -DROP PROCEDURE IF EXISTS missing_PROCEDURE(int, text); -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -DROP PROCEDURE IF EXISTS missing_schema.missing_PROCEDURE(int,float); -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 -SELECT deparse_and_run_on_workers($cmd$ -DROP PROCEDURE IF EXISTS missing_schema.missing_PROCEDURE(int,float) CASCADE; -$cmd$); -ERROR: syntax error at or near "PROCEDURE" -LINE 2: WITH deparsed_query AS ( SELECT deparse_test($1) qualifi... - ^ -QUERY: - WITH deparsed_query AS ( SELECT deparse_test($1) qualified_query ) - SELECT run_command_on_workers(qualified_query) FROM deparsed_query d - -CONTEXT: SQL function "deparse_and_run_on_workers" statement 1 --- clear objects -SET client_min_messages TO WARNING; -- suppress cascading objects dropping -DROP SCHEMA procedure_tests CASCADE; -DROP ROLE PROCEDURE_role; +SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above + \gset +\if :server_verion_eleven_and_above +\else +\q diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index d3504e70e..d9a7f3ebc 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -54,6 +54,8 @@ WHERE pgd.refclassid = 'pg_extension'::regclass AND -- DROP EXTENSION pre-created by the regression suite DROP EXTENSION citus; \c +-- these tests switch between citus versions and call ddl's that require pg_dist_object to be created +SET citus.enable_object_propagation TO 'false'; SET citus.enable_version_checks TO 'false'; CREATE EXTENSION citus VERSION '7.0-1'; ALTER EXTENSION citus UPDATE TO '7.0-2'; diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 799324e22..ce648077f 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -150,3 +150,23 @@ BEGIN from pg_proc, pg_dist_partition where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; END;$$; +-- helper function to verify the function of a coordinator is the same on all workers +CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text) + RETURNS bool + LANGUAGE plpgsql +AS $func$ +DECLARE + coordinatorSql text; + workerSql text; +BEGIN + SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql; + FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP + IF workerSql != coordinatorSql THEN + RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql; + RETURN false; + END IF; + END LOOP; + + RETURN true; +END; +$func$; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 2c1084cd0..af1bf504e 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -282,6 +282,7 @@ test: ssl_by_default # --------- test: distributed_types distributed_types_conflict disable_object_propagation test: distributed_functions +test: distributed_procedure # --------- # deparsing logic tests diff --git a/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec index 3d51a57d8..9686d007d 100644 --- a/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec +++ b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec @@ -30,20 +30,6 @@ teardown DROP TYPE IF EXISTS tt1 CASCADE; DROP FUNCTION IF EXISTS add(INT,INT) CASCADE; - -- Remove the pg_dist_object record manually as we do not yet hook into DROP FUNC - -- queries. If the function does not exist, the casting to regprocedure fails. - DO - $do$ - BEGIN - DELETE FROM citus.pg_dist_object WHERE objid = 'add(int,int)'::regprocedure; - EXCEPTION - WHEN undefined_function THEN RETURN; - END; - $do$; - - -- similarly drop the function in the workers manually - SELECT run_command_on_workers($$DROP FUNCTION IF EXISTS add(INT,INT) CASCADE;$$); - SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; } @@ -230,7 +216,7 @@ permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-sche # s3-wait-for-metadata-sync step, we do "s2-begin" followed directly by # "s2-commit", because "COMMIT" syncs the messages -permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-distribute-function" "s1-commit" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s1-print-distributed-objects" +permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-distribute-function" "s1-commit" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s2-print-distributed-objects" permutation "s1-print-distributed-objects" "s1-begin" "s2-public-schema" "s2-distribute-function" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s1-add-worker" "s1-commit" "s3-wait-for-metadata-sync" "s2-print-distributed-objects" # we cannot run the following operations concurrently diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index 88985e04e..eaf02faa5 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -4,6 +4,7 @@ CREATE USER functionuser; SELECT run_command_on_workers($$CREATE USER functionuser;$$); CREATE SCHEMA function_tests AUTHORIZATION functionuser; +CREATE SCHEMA function_tests2 AUTHORIZATION functionuser; SET search_path TO function_tests; SET citus.shard_count TO 4; @@ -62,6 +63,14 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer IMMUTABLE RETURNS NULL ON NULL INPUT; +-- make sure to propagate ddl propagation after we have setup our functions, this will +-- allow alter statements to be propagated and keep the functions in sync across machines +SET citus.enable_ddl_propagation TO on; + +-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists. +CREATE TABLE colocation_table(id int); +SELECT create_distributed_table('colocation_table','id'); + -- make sure that none of the active and primary nodes hasmetadata -- at the start of the test select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; @@ -84,6 +93,65 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY SELECT create_distributed_function('add(int,int)', '$1'); SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + +-- testing alter statements for a distributed function +-- ROWS 5, untested because; +-- ERROR: ROWS is not applicable when function does not return a set +ALTER FUNCTION add(int,int) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE LEAKPROOF COST 5; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION add(int,int) STRICT VOLATILE PARALLEL SAFE; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + +-- Test SET/RESET for alter function +ALTER FUNCTION add(int,int) SET client_min_messages TO warning; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION add(int,int) SET client_min_messages TO error; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION add(int,int) SET client_min_messages TO debug; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION add(int,int) RESET client_min_messages; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + +-- SET ... FROM CURRENT is not supported, verify the query fails with a descriptive error irregardless of where in the action list the statement occurs +ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION add(int,int) RETURNS NULL ON NULL INPUT SET client_min_messages FROM CURRENT; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +ALTER FUNCTION add(int,int) SET client_min_messages FROM CURRENT SECURITY DEFINER; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); + +-- rename function and make sure the new name can be used on the workers while the old name can't +ALTER FUNCTION add(int,int) RENAME TO add2; +SELECT public.verify_function_is_same_on_workers('function_tests.add2(int,int)'); +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; +SELECT * FROM run_command_on_workers('SELECT function_tests.add2(2,3);') ORDER BY 1,2; +ALTER FUNCTION add2(int,int) RENAME TO add; + +-- change the owner of the function and verify the owner has been changed on the workers +ALTER FUNCTION add(int,int) OWNER TO functionuser; +SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); +SELECT run_command_on_workers($$ +SELECT row(usename, nspname, proname) +FROM pg_proc +JOIN pg_user ON (usesysid = proowner) +JOIN pg_namespace ON (pg_namespace.oid = pronamespace) +WHERE proname = 'add'; +$$); + +-- change the schema of the function and verify the old schema doesn't exist anymore while +-- the new schema has the function. +ALTER FUNCTION add(int,int) SET SCHEMA function_tests2; +SELECT public.verify_function_is_same_on_workers('function_tests2.add(int,int)'); +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; +SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER BY 1,2; +ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; + +DROP FUNCTION add(int,int); +-- call should fail as function should have been dropped +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; -- postgres doesn't accept parameter names in the regprocedure input SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); @@ -215,6 +283,7 @@ SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isa SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; +DROP SCHEMA function_tests2 CASCADE; -- This is hacky, but we should clean-up the resources as below @@ -224,6 +293,7 @@ UPDATE pg_dist_local_group SET groupid = 0; SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; TRUNCATE pg_dist_node; DROP SCHEMA function_tests CASCADE; +DROP SCHEMA function_tests2 CASCADE; \c - - - :worker_2_port SET client_min_messages TO error; -- suppress cascading objects dropping @@ -231,6 +301,7 @@ UPDATE pg_dist_local_group SET groupid = 0; SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; TRUNCATE pg_dist_node; DROP SCHEMA function_tests CASCADE; +DROP SCHEMA function_tests2 CASCADE; \c - - - :master_port diff --git a/src/test/regress/sql/distributed_procedure.sql b/src/test/regress/sql/distributed_procedure.sql new file mode 100644 index 000000000..453eaeea9 --- /dev/null +++ b/src/test/regress/sql/distributed_procedure.sql @@ -0,0 +1,90 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above + \gset +\if :server_verion_eleven_and_above +\else +\q +\endif + +SET citus.next_shard_id TO 20030000; + +CREATE USER procedureuser; +SELECT run_command_on_workers($$CREATE USER procedureuser;$$); + +CREATE SCHEMA procedure_tests AUTHORIZATION procedureuser; +CREATE SCHEMA procedure_tests2 AUTHORIZATION procedureuser; + +SET search_path TO procedure_tests; +SET citus.shard_count TO 4; + +-- Create and distribute a simple function +CREATE OR REPLACE PROCEDURE raise_info(text) + LANGUAGE PLPGSQL AS $proc$ +BEGIN + RAISE INFO 'information message %', $1; +END; +$proc$; + +-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. +CREATE TABLE colocation_table(id text); +SELECT create_distributed_table('colocation_table','id'); + +SELECT create_distributed_function('raise_info(text)', '$1'); +SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); + +-- testing alter statements for a distributed function +-- ROWS 5, untested because; +-- ERROR: ROWS is not applicable when function does not return a set +ALTER PROCEDURE raise_info(text) SECURITY INVOKER; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); +ALTER PROCEDURE raise_info(text) SECURITY DEFINER; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); + +-- Test SET/RESET for alter procedure +ALTER PROCEDURE raise_info(text) SET client_min_messages TO warning; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); +ALTER PROCEDURE raise_info(text) SET client_min_messages TO error; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); +ALTER PROCEDURE raise_info(text) SET client_min_messages TO debug; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); +ALTER PROCEDURE raise_info(text) RESET client_min_messages; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); + +-- rename function and make sure the new name can be used on the workers while the old name can't +ALTER PROCEDURE raise_info(text) RENAME TO raise_info2; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info2(text)'); +SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info2('hello');$$) ORDER BY 1,2; +ALTER PROCEDURE raise_info2(text) RENAME TO raise_info; + +-- change the owner of the function and verify the owner has been changed on the workers +ALTER PROCEDURE raise_info(text) OWNER TO procedureuser; +SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); +SELECT run_command_on_workers($$ +SELECT row(usename, nspname, proname) +FROM pg_proc +JOIN pg_user ON (usesysid = proowner) +JOIN pg_namespace ON (pg_namespace.oid = pronamespace) +WHERE proname = 'raise_info'; +$$); + +-- change the schema of the procedure and verify the old schema doesn't exist anymore while +-- the new schema has the function. +ALTER PROCEDURE raise_info(text) SET SCHEMA procedure_tests2; +SELECT public.verify_function_is_same_on_workers('procedure_tests2.raise_info(text)'); +SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; +SELECT * FROM run_command_on_workers($$CALL procedure_tests2.raise_info('hello');$$) ORDER BY 1,2; +ALTER PROCEDURE procedure_tests2.raise_info(text) SET SCHEMA procedure_tests; + +DROP PROCEDURE raise_info(text); +-- call should fail as procedure should have been dropped +SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; + +SET client_min_messages TO error; -- suppress cascading objects dropping +DROP SCHEMA procedure_tests CASCADE; +SELECT run_command_on_workers($$DROP SCHEMA procedure_tests CASCADE;$$); +DROP SCHEMA procedure_tests2 CASCADE; +SELECT run_command_on_workers($$DROP SCHEMA procedure_tests2 CASCADE;$$); +DROP USER procedureuser; +SELECT run_command_on_workers($$DROP USER procedureuser;$$); diff --git a/src/test/regress/sql/multi_deparse_function.sql b/src/test/regress/sql/multi_deparse_function.sql index a673e3170..31b1a053a 100644 --- a/src/test/regress/sql/multi_deparse_function.sql +++ b/src/test/regress/sql/multi_deparse_function.sql @@ -32,6 +32,7 @@ -- [ CASCADE | RESTRICT ] SET citus.next_shard_id TO 20020000; +SET citus.enable_ddl_propagation TO off; CREATE SCHEMA function_tests; SET search_path TO function_tests; diff --git a/src/test/regress/sql/multi_deparse_procedure.sql b/src/test/regress/sql/multi_deparse_procedure.sql index 0261b3c9e..334b058cf 100644 --- a/src/test/regress/sql/multi_deparse_procedure.sql +++ b/src/test/regress/sql/multi_deparse_procedure.sql @@ -1,3 +1,10 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 11 AS server_verion_eleven_and_above + \gset +\if :server_verion_eleven_and_above +\else +\q +\endif -- -- Regression tests for deparsing ALTER/DROP PROCEDURE Queries -- @@ -29,6 +36,7 @@ -- allow deparsing them here. SET citus.next_shard_id TO 20030000; +SET citus.enable_ddl_propagation TO off; CREATE SCHEMA procedure_tests; SET search_path TO procedure_tests; @@ -167,30 +175,43 @@ SELECT deparse_and_run_on_workers($cmd$ ALTER PROCEDURE raise_info RESET ALL $cmd$); +-- rename and rename back to keep the nodes in sync SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info RENAME TO summation +ALTER PROCEDURE raise_info RENAME TO summation; $cmd$); +ALTER PROCEDURE raise_info RENAME TO summation; +SELECT deparse_and_run_on_workers($cmd$ +ALTER PROCEDURE summation RENAME TO raise_info; +$cmd$); +ALTER PROCEDURE summation RENAME TO raise_info; -CREATE ROLE PROCEDURE_role; +CREATE ROLE procedure_role; +SELECT run_command_on_workers($$CREATE ROLE procedure_role;$$); SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info OWNER TO PROCEDURE_role +ALTER PROCEDURE raise_info OWNER TO procedure_role $cmd$); SELECT deparse_and_run_on_workers($cmd$ ALTER PROCEDURE raise_info OWNER TO missing_role $cmd$); +-- move schema and back to keep the nodes in sync SELECT deparse_and_run_on_workers($cmd$ -ALTER PROCEDURE raise_info SET SCHEMA public +ALTER PROCEDURE raise_info SET SCHEMA public; $cmd$); +ALTER PROCEDURE raise_info SET SCHEMA public; +SELECT deparse_and_run_on_workers($cmd$ +ALTER PROCEDURE public.raise_info SET SCHEMA procedure_tests; +$cmd$); +ALTER PROCEDURE public.raise_info SET SCHEMA procedure_tests; SELECT deparse_and_run_on_workers($cmd$ ALTER PROCEDURE raise_info DEPENDS ON EXTENSION citus $cmd$); SELECT deparse_and_run_on_workers($cmd$ -DROP PROCEDURE IF EXISTS raise_info(int,int); +DROP PROCEDURE raise_info(text); $cmd$); -- Check that an invalid PROCEDURE name is still parsed correctly @@ -209,4 +230,5 @@ $cmd$); -- clear objects SET client_min_messages TO WARNING; -- suppress cascading objects dropping DROP SCHEMA procedure_tests CASCADE; -DROP ROLE PROCEDURE_role; +DROP ROLE procedure_role; +SELECT run_command_on_workers($$DROP ROLE procedure_role;$$); diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index be8d0432a..39c80bd4b 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -53,6 +53,9 @@ WHERE pgd.refclassid = 'pg_extension'::regclass AND DROP EXTENSION citus; \c +-- these tests switch between citus versions and call ddl's that require pg_dist_object to be created +SET citus.enable_object_propagation TO 'false'; + SET citus.enable_version_checks TO 'false'; CREATE EXTENSION citus VERSION '7.0-1'; diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 0e853971d..28e464f7a 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -152,4 +152,23 @@ BEGIN where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; END;$$; +-- helper function to verify the function of a coordinator is the same on all workers +CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text) + RETURNS bool + LANGUAGE plpgsql +AS $func$ +DECLARE + coordinatorSql text; + workerSql text; +BEGIN + SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql; + FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP + IF workerSql != coordinatorSql THEN + RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql; + RETURN false; + END IF; + END LOOP; + RETURN true; +END; +$func$;