Add all object dependency check

velioglu/tmpfuncprop
Burak Velioglu 2022-02-15 17:37:48 +03:00
parent 110aff95c8
commit c7b8c4db34
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
5 changed files with 117 additions and 19 deletions

View File

@ -83,7 +83,7 @@ static void EnsureSequentialModeForFunctionDDL(void);
static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt);
static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static bool ShouldPropagateAlterFunction(const ObjectAddress *address);
static bool ShouldAddFunctionSignature(FunctionParameterMode mode); static bool ShouldAddFunctionSignature(FunctionParameterMode mode);
static ObjectAddress * UndistributableRelationDependencyOfFunction( static ObjectAddress * GetUndistributableRelationDependency(
ObjectAddress *functionAddress); ObjectAddress *functionAddress);
static ObjectAddress FunctionToObjectAddress(ObjectType objectType, static ObjectAddress FunctionToObjectAddress(ObjectType objectType,
ObjectWithArgs *objectWithArgs, ObjectWithArgs *objectWithArgs,
@ -1337,8 +1337,8 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString)
return NIL; return NIL;
} }
ObjectAddress *undistributableDependency = ObjectAddress *undistributableDependency = GetUndistributableRelationDependency(
UndistributableRelationDependencyOfFunction(&functionAddress); &functionAddress);
if (undistributableDependency != NULL) if (undistributableDependency != NULL)
{ {
RangeVar *functionRangeVar = makeRangeVarFromNameList(stmt->funcname); RangeVar *functionRangeVar = makeRangeVarFromNameList(stmt->funcname);
@ -1366,28 +1366,27 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString)
/* /*
* UndistributableRelationDependencyOfFunction checks whether Citus can distribute * GetUndistributableRelationDependency checks whether object has any non-distributable
* dependent relations of the given function. If any non-distributable one found, it * relation dependency. If any one found, it will be returned.
* will be returned.
*/ */
static ObjectAddress * static ObjectAddress *
UndistributableRelationDependencyOfFunction(ObjectAddress *functionAddress) GetUndistributableRelationDependency(ObjectAddress *objectAddress)
{ {
Assert(getObjectClass(functionAddress) == OCLASS_PROC); List *dependencies = GetAllDependenciesForObject(objectAddress);
List *dependencies = GetDependenciesForObject(functionAddress);
ObjectAddress *dependency = NULL; ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies) foreach_ptr(dependency, dependencies)
{ {
if (IsObjectDistributed(dependency))
{
continue;
}
if (getObjectClass(dependency) != OCLASS_CLASS) if (getObjectClass(dependency) != OCLASS_CLASS)
{ {
continue; continue;
} }
/* /*
* Since GetDependenciesForObject returns only non-distributed ones, any
* relation comes to that point is must be a non-distributed one.
*
* Citus can only distribute dependent non-distributed sequence and composite * Citus can only distribute dependent non-distributed sequence and composite
* types. * types.
*/ */

View File

@ -155,6 +155,8 @@ static bool FollowAllSupportedDependencies(ObjectAddressCollector *collector,
DependencyDefinition *definition); DependencyDefinition *definition);
static bool FollowNewSupportedDependencies(ObjectAddressCollector *collector, static bool FollowNewSupportedDependencies(ObjectAddressCollector *collector,
DependencyDefinition *definition); DependencyDefinition *definition);
static bool FollowAllDependencies(ObjectAddressCollector *collector,
DependencyDefinition *definition);
static void ApplyAddToDependencyList(ObjectAddressCollector *collector, static void ApplyAddToDependencyList(ObjectAddressCollector *collector,
DependencyDefinition *definition); DependencyDefinition *definition);
static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector, static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector,
@ -211,15 +213,39 @@ GetDependenciesForObject(const ObjectAddress *target)
/* /*
* GetAllDependenciesForObject returns a list of all the ObjectAddresses to be * GetAllSupportedDependenciesForObject returns a list of all the ObjectAddresses to be
* created in order before the target object could safely be created on a * created in order before the target object could safely be created on a worker, if all
* worker. As a caller, you probably need GetDependenciesForObject() which * dependent objects are distributable. As a caller, you probably need to use
* eliminates already distributed objects from the returned list. * GetDependenciesForObject() which eliminates already distributed objects from the returned
* list.
* *
* Some of the object might already be created on a worker. It should be created * Some of the object might already be created on a worker. It should be created
* in an idempotent way. * in an idempotent way.
*/ */
List * List *
GetAllSupportedDependenciesForObject(const ObjectAddress *target)
{
ObjectAddressCollector collector = { 0 };
InitObjectAddressCollector(&collector);
RecurseObjectDependencies(*target,
&ExpandCitusSupportedTypes,
&FollowAllSupportedDependencies,
&ApplyAddToDependencyList,
&collector);
return collector.dependencyList;
}
/*
* GetAllDependenciesForObject returns a list of all the dependent objects of the given
* object irrespective of whether the dependent object is supported by Citus or not.
* This function will be used to provide meaningful error messages if any dependent
* object for a given object is not supported. If you want to create dependencies for
* an object, you probably need to use GetDependenciesForObject().
*/
List *
GetAllDependenciesForObject(const ObjectAddress *target) GetAllDependenciesForObject(const ObjectAddress *target)
{ {
ObjectAddressCollector collector = { 0 }; ObjectAddressCollector collector = { 0 };
@ -227,7 +253,7 @@ GetAllDependenciesForObject(const ObjectAddress *target)
RecurseObjectDependencies(*target, RecurseObjectDependencies(*target,
&ExpandCitusSupportedTypes, &ExpandCitusSupportedTypes,
&FollowAllSupportedDependencies, &FollowAllDependencies,
&ApplyAddToDependencyList, &ApplyAddToDependencyList,
&collector); &collector);
@ -895,6 +921,55 @@ FollowAllSupportedDependencies(ObjectAddressCollector *collector,
} }
/*
* FollowAllDependencies applies filters on pg_depend entries to follow the dependency
* tree of objects in depth first order. We will visit all objects irrespective of it is
* supported by Citus or not.
*/
static bool
FollowAllDependencies(ObjectAddressCollector *collector,
DependencyDefinition *definition)
{
if (definition->mode == DependencyPgDepend)
{
/*
* For dependencies found in pg_depend:
*
* Follow only normal and extension dependencies. The latter is used to reach the
* extensions, the objects that directly depend on the extension are eliminated
* during the "apply" phase.
*
* Other dependencies are internal dependencies and managed by postgres.
*/
if (definition->data.pg_depend.deptype != DEPENDENCY_NORMAL &&
definition->data.pg_depend.deptype != DEPENDENCY_EXTENSION)
{
return false;
}
}
/* rest of the tests are to see if we want to follow the actual dependency */
ObjectAddress address = DependencyDefinitionObjectAddress(definition);
/*
* If the object is already in our dependency list we do not have to follow any
* further
*/
if (IsObjectAddressCollected(address, collector))
{
return false;
}
if (CitusExtensionObject(&address))
{
/* following citus extension could complicate role management */
return false;
}
return true;
}
/* /*
* ApplyAddToDependencyList is an apply function for RecurseObjectDependencies that will collect * ApplyAddToDependencyList is an apply function for RecurseObjectDependencies that will collect
* all the ObjectAddresses for pg_depend entries to the context. The context here is * all the ObjectAddresses for pg_depend entries to the context. The context here is

View File

@ -47,7 +47,7 @@ citus_get_all_dependencies_for_object(PG_FUNCTION_ARGS)
ObjectAddress address = { 0 }; ObjectAddress address = { 0 };
ObjectAddressSubSet(address, classid, objid, objsubid); ObjectAddressSubSet(address, classid, objid, objsubid);
List *dependencies = GetAllDependenciesForObject(&address); List *dependencies = GetAllSupportedDependenciesForObject(&address);
ObjectAddress *dependency = NULL; ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies) foreach_ptr(dependency, dependencies)
{ {

View File

@ -19,6 +19,7 @@
extern List * GetUniqueDependenciesList(List *objectAddressesList); extern List * GetUniqueDependenciesList(List *objectAddressesList);
extern List * GetDependenciesForObject(const ObjectAddress *target); extern List * GetDependenciesForObject(const ObjectAddress *target);
extern List * GetAllSupportedDependenciesForObject(const ObjectAddress *target);
extern List * GetAllDependenciesForObject(const ObjectAddress *target); extern List * GetAllDependenciesForObject(const ObjectAddress *target);
extern List * OrderObjectAddressListInDependencyOrder(List *objectAddressList); extern List * OrderObjectAddressListInDependencyOrder(List *objectAddressList);
extern bool SupportedDependencyByCitus(const ObjectAddress *address); extern bool SupportedDependencyByCitus(const ObjectAddress *address);

View File

@ -68,6 +68,10 @@ SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(clas
-- Note that after pg 14 creating sequence doesn't create type -- Note that after pg 14 creating sequence doesn't create type
-- it is expected for versions > pg14 to fail sequence tests below -- it is expected for versions > pg14 to fail sequence tests below
CREATE SEQUENCE function_prop_seq; CREATE SEQUENCE function_prop_seq;
-- Show that sequence is not distributed yet
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.function_prop_seq'::regclass::oid;
CREATE OR REPLACE FUNCTION func_4(param_1 function_prop_seq) CREATE OR REPLACE FUNCTION func_4(param_1 function_prop_seq)
RETURNS int RETURNS int
LANGUAGE plpgsql AS LANGUAGE plpgsql AS
@ -83,6 +87,10 @@ SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(clas
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_4'::regproc::oid;$$) ORDER BY 1,2; SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_4'::regproc::oid;$$) ORDER BY 1,2;
CREATE SEQUENCE function_prop_seq_2; CREATE SEQUENCE function_prop_seq_2;
-- Show that sequence is not distributed yet
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.function_prop_seq_2'::regclass::oid;
CREATE OR REPLACE FUNCTION func_5(param_1 int) CREATE OR REPLACE FUNCTION func_5(param_1 int)
RETURNS function_prop_seq_2 RETURNS function_prop_seq_2
LANGUAGE plpgsql AS LANGUAGE plpgsql AS
@ -185,4 +193,19 @@ SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dis
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.type_in_transaction'::regtype::oid;$$) ORDER BY 1,2; SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.type_in_transaction'::regtype::oid;$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction'::regproc::oid;$$) ORDER BY 1,2; SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_in_transaction'::regproc::oid;$$) ORDER BY 1,2;
-- Test for SQL function with unsupported object in function body
CREATE TABLE table_in_sql_body(id int);
CREATE FUNCTION max_of_table()
RETURNS int
LANGUAGE SQL AS
$$
SELECT max(id) table_in_sql_body
$$;
-- Show that only function has propagated, since the table is not resolved as dependency
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.type_in_transaction'::regclass::oid;
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.max_of_table'::regproc::oid;
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.max_of_table'::regproc::oid;$$) ORDER BY 1,2;
RESET search_path; RESET search_path;