diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 1d0c7e3fb..295af375f 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -83,7 +83,7 @@ static void EnsureSequentialModeForFunctionDDL(void); static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static bool ShouldAddFunctionSignature(FunctionParameterMode mode); -static ObjectAddress * UndistributableRelationDependencyOfFunction( +static ObjectAddress * GetUndistributableRelationDependency( ObjectAddress *functionAddress); static ObjectAddress FunctionToObjectAddress(ObjectType objectType, ObjectWithArgs *objectWithArgs, @@ -1337,8 +1337,8 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString) return NIL; } - ObjectAddress *undistributableDependency = - UndistributableRelationDependencyOfFunction(&functionAddress); + ObjectAddress *undistributableDependency = GetUndistributableRelationDependency( + &functionAddress); if (undistributableDependency != NULL) { RangeVar *functionRangeVar = makeRangeVarFromNameList(stmt->funcname); @@ -1366,28 +1366,27 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString) /* - * UndistributableRelationDependencyOfFunction checks whether Citus can distribute - * dependent relations of the given function. If any non-distributable one found, it - * will be returned. + * GetUndistributableRelationDependency checks whether object has any non-distributable + * relation dependency. If any one found, it will be returned. */ static ObjectAddress * -UndistributableRelationDependencyOfFunction(ObjectAddress *functionAddress) +GetUndistributableRelationDependency(ObjectAddress *objectAddress) { - Assert(getObjectClass(functionAddress) == OCLASS_PROC); - - List *dependencies = GetDependenciesForObject(functionAddress); + List *dependencies = GetAllDependenciesForObject(objectAddress); ObjectAddress *dependency = NULL; foreach_ptr(dependency, dependencies) { + if (IsObjectDistributed(dependency)) + { + continue; + } + if (getObjectClass(dependency) != OCLASS_CLASS) { continue; } /* - * Since GetDependenciesForObject returns only non-distributed ones, any - * relation comes to that point is must be a non-distributed one. - * * Citus can only distribute dependent non-distributed sequence and composite * types. */ diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index a9a154242..a7f87965f 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -155,6 +155,8 @@ static bool FollowAllSupportedDependencies(ObjectAddressCollector *collector, DependencyDefinition *definition); static bool FollowNewSupportedDependencies(ObjectAddressCollector *collector, DependencyDefinition *definition); +static bool FollowAllDependencies(ObjectAddressCollector *collector, + DependencyDefinition *definition); static void ApplyAddToDependencyList(ObjectAddressCollector *collector, DependencyDefinition *definition); static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector, @@ -211,15 +213,39 @@ GetDependenciesForObject(const ObjectAddress *target) /* - * GetAllDependenciesForObject returns a list of all the ObjectAddresses to be - * created in order before the target object could safely be created on a - * worker. As a caller, you probably need GetDependenciesForObject() which - * eliminates already distributed objects from the returned list. + * GetAllSupportedDependenciesForObject returns a list of all the ObjectAddresses to be + * created in order before the target object could safely be created on a worker, if all + * dependent objects are distributable. As a caller, you probably need to use + * GetDependenciesForObject() which eliminates already distributed objects from the returned + * list. * * Some of the object might already be created on a worker. It should be created * in an idempotent way. */ List * +GetAllSupportedDependenciesForObject(const ObjectAddress *target) +{ + ObjectAddressCollector collector = { 0 }; + InitObjectAddressCollector(&collector); + + RecurseObjectDependencies(*target, + &ExpandCitusSupportedTypes, + &FollowAllSupportedDependencies, + &ApplyAddToDependencyList, + &collector); + + return collector.dependencyList; +} + + +/* + * GetAllDependenciesForObject returns a list of all the dependent objects of the given + * object irrespective of whether the dependent object is supported by Citus or not. + * This function will be used to provide meaningful error messages if any dependent + * object for a given object is not supported. If you want to create dependencies for + * an object, you probably need to use GetDependenciesForObject(). + */ +List * GetAllDependenciesForObject(const ObjectAddress *target) { ObjectAddressCollector collector = { 0 }; @@ -227,7 +253,7 @@ GetAllDependenciesForObject(const ObjectAddress *target) RecurseObjectDependencies(*target, &ExpandCitusSupportedTypes, - &FollowAllSupportedDependencies, + &FollowAllDependencies, &ApplyAddToDependencyList, &collector); @@ -895,6 +921,55 @@ FollowAllSupportedDependencies(ObjectAddressCollector *collector, } +/* + * FollowAllDependencies applies filters on pg_depend entries to follow the dependency + * tree of objects in depth first order. We will visit all objects irrespective of it is + * supported by Citus or not. + */ +static bool +FollowAllDependencies(ObjectAddressCollector *collector, + DependencyDefinition *definition) +{ + if (definition->mode == DependencyPgDepend) + { + /* + * For dependencies found in pg_depend: + * + * Follow only normal and extension dependencies. The latter is used to reach the + * extensions, the objects that directly depend on the extension are eliminated + * during the "apply" phase. + * + * Other dependencies are internal dependencies and managed by postgres. + */ + if (definition->data.pg_depend.deptype != DEPENDENCY_NORMAL && + definition->data.pg_depend.deptype != DEPENDENCY_EXTENSION) + { + return false; + } + } + + /* rest of the tests are to see if we want to follow the actual dependency */ + ObjectAddress address = DependencyDefinitionObjectAddress(definition); + + /* + * If the object is already in our dependency list we do not have to follow any + * further + */ + if (IsObjectAddressCollected(address, collector)) + { + return false; + } + + if (CitusExtensionObject(&address)) + { + /* following citus extension could complicate role management */ + return false; + } + + return true; +} + + /* * ApplyAddToDependencyList is an apply function for RecurseObjectDependencies that will collect * all the ObjectAddresses for pg_depend entries to the context. The context here is diff --git a/src/backend/distributed/test/dependency.c b/src/backend/distributed/test/dependency.c index 79ad1c139..82e818b8c 100644 --- a/src/backend/distributed/test/dependency.c +++ b/src/backend/distributed/test/dependency.c @@ -47,7 +47,7 @@ citus_get_all_dependencies_for_object(PG_FUNCTION_ARGS) ObjectAddress address = { 0 }; ObjectAddressSubSet(address, classid, objid, objsubid); - List *dependencies = GetAllDependenciesForObject(&address); + List *dependencies = GetAllSupportedDependenciesForObject(&address); ObjectAddress *dependency = NULL; foreach_ptr(dependency, dependencies) { diff --git a/src/include/distributed/metadata/dependency.h b/src/include/distributed/metadata/dependency.h index d20103011..141b2a628 100644 --- a/src/include/distributed/metadata/dependency.h +++ b/src/include/distributed/metadata/dependency.h @@ -19,6 +19,7 @@ extern List * GetUniqueDependenciesList(List *objectAddressesList); extern List * GetDependenciesForObject(const ObjectAddress *target); +extern List * GetAllSupportedDependenciesForObject(const ObjectAddress *target); extern List * GetAllDependenciesForObject(const ObjectAddress *target); extern List * OrderObjectAddressListInDependencyOrder(List *objectAddressList); extern bool SupportedDependencyByCitus(const ObjectAddress *address); diff --git a/src/test/regress/sql/function_propagation.sql b/src/test/regress/sql/function_propagation.sql index 4ea65224b..2d40e244d 100644 --- a/src/test/regress/sql/function_propagation.sql +++ b/src/test/regress/sql/function_propagation.sql @@ -68,6 +68,10 @@ SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(clas -- Note that after pg 14 creating sequence doesn't create type -- it is expected for versions > pg14 to fail sequence tests below CREATE SEQUENCE function_prop_seq; + +-- Show that sequence is not distributed yet +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.function_prop_seq'::regclass::oid; + CREATE OR REPLACE FUNCTION func_4(param_1 function_prop_seq) RETURNS int LANGUAGE plpgsql AS @@ -83,6 +87,10 @@ SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(clas SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_4'::regproc::oid;$$) ORDER BY 1,2; CREATE SEQUENCE function_prop_seq_2; + +-- Show that sequence is not distributed yet +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.function_prop_seq_2'::regclass::oid; + CREATE OR REPLACE FUNCTION func_5(param_1 int) RETURNS function_prop_seq_2 LANGUAGE plpgsql AS @@ -185,4 +193,19 @@ SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dis SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.type_in_transaction'::regtype::oid;$$) ORDER BY 1,2; SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction'::regproc::oid;$$) ORDER BY 1,2; +-- Test for SQL function with unsupported object in function body +CREATE TABLE table_in_sql_body(id int); + +CREATE FUNCTION max_of_table() +RETURNS int +LANGUAGE SQL AS +$$ + SELECT max(id) table_in_sql_body +$$; + +-- Show that only function has propagated, since the table is not resolved as dependency +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.type_in_transaction'::regclass::oid; +SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.max_of_table'::regproc::oid; +SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.max_of_table'::regproc::oid;$$) ORDER BY 1,2; + RESET search_path;