diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index 043c16410..cecd7b10e 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -18,18 +18,20 @@ #include "catalog/indexing.h" #include "catalog/pg_class.h" #include "catalog/pg_depend.h" +#include "catalog/pg_shdepend.h" #include "catalog/pg_type.h" #include "distributed/commands/utility_hook.h" #include "distributed/listutils.h" #include "distributed/metadata/dependency.h" #include "distributed/metadata/distobject.h" +#include "miscadmin.h" #include "utils/fmgroids.h" #include "utils/hsearch.h" #include "utils/lsyscache.h" /* * ObjectAddressCollector keeps track of collected ObjectAddresses. This can be used - * together with recurse_pg_depend. + * together with RecurseObjectDependencies. * * We keep three different datastructures for the following reasons * - A List ordered by insert/collect order @@ -44,35 +46,88 @@ typedef struct ObjectAddressCollector HTAB *visitedObjects; } ObjectAddressCollector; +/* + * DependencyMode distinguishes the data stored in DependencyDefinition. For details see + * DependencyDefinition's inline comments in the data union. + */ +typedef enum DependencyMode +{ + DependencyObjectAddress, + DependencyPgDepend, + DependencyPgShDepend +} DependencyMode; + +typedef struct DependencyDefinition +{ + /* describe how the dependency data is stored in the data field */ + DependencyMode mode; + + /* + * Dependencies can be found in different ways and therefore stored differently on the + * definition. + */ + union + { + /* + * pg_depend is used for dependencies found in the database local pg_depend table. + * The entry is copied while scanning the table. The record can be inspected + * during the chasing algorithm to follow dependencies of different classes, or + * based on dependency type. + */ + FormData_pg_depend pg_depend; + + /* + * pg_shdepend is used for dependencies found in the global pg_shdepend table. + * The entry is copied while scanning the table. The record can be inspected + * during the chasing algorithm to follow dependencies of different classes, or + * based on dependency type. + */ + FormData_pg_shdepend pg_shdepend; + + /* + * address is used for dependencies that are artificially added during the + * chasing. Since they are added by citus code we assume the dependency needs to + * be chased anyway, ofcourse it will only actually be chased if the object is a + * suppported object by citus + */ + ObjectAddress address; + } data; +} DependencyDefinition; + + +static ObjectAddress DependencyDefinitionObjectAddress(DependencyDefinition *definition); /* forward declarations for functions to interact with the ObjectAddressCollector */ static void InitObjectAddressCollector(ObjectAddressCollector *collector); static void CollectObjectAddress(ObjectAddressCollector *collector, const ObjectAddress *address); -static bool IsObjectAddressCollected(const ObjectAddress *findAddress, +static bool IsObjectAddressCollected(ObjectAddress findAddress, ObjectAddressCollector *collector); static void MarkObjectVisited(ObjectAddressCollector *collector, - const ObjectAddress *target); + ObjectAddress target); static bool TargetObjectVisited(ObjectAddressCollector *collector, - const ObjectAddress *target); + ObjectAddress target); + +typedef List *(*expandFn)(ObjectAddressCollector *collector, ObjectAddress target); +typedef bool (*followFn)(ObjectAddressCollector *collector, + DependencyDefinition *definition); +typedef void (*applyFn)(ObjectAddressCollector *collector, + DependencyDefinition *definition); /* forward declaration of functions that recurse pg_depend */ -static void recurse_pg_depend(const ObjectAddress *target, - List * (*expand)(ObjectAddressCollector *collector, - const ObjectAddress *target), - bool (*follow)(ObjectAddressCollector *collector, - Form_pg_depend row), - void (*apply)(ObjectAddressCollector *collector, - Form_pg_depend row), - ObjectAddressCollector *collector); +static void RecurseObjectDependencies(ObjectAddress target, expandFn expand, + followFn follow, applyFn apply, + ObjectAddressCollector *collector); +static List * DependencyDefinitionFromPgDepend(ObjectAddress target); +static List * DependencyDefinitionFromPgShDepend(ObjectAddress target); static bool FollowAllSupportedDependencies(ObjectAddressCollector *collector, - Form_pg_depend pg_depend); + DependencyDefinition *definition); static bool FollowNewSupportedDependencies(ObjectAddressCollector *collector, - Form_pg_depend pg_depend); + DependencyDefinition *definition); static void ApplyAddToDependencyList(ObjectAddressCollector *collector, - Form_pg_depend pg_depend); + DependencyDefinition *definition); static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector, - const ObjectAddress *target); + ObjectAddress target); /* @@ -88,7 +143,7 @@ GetUniqueDependenciesList(List *objectAddressesList) ObjectAddress *objectAddress = NULL; foreach_ptr(objectAddress, objectAddressesList) { - if (IsObjectAddressCollected(objectAddress, &objectAddressCollector)) + if (IsObjectAddressCollected(*objectAddress, &objectAddressCollector)) { /* skip objects that are already collected */ continue; @@ -112,11 +167,11 @@ GetDependenciesForObject(const ObjectAddress *target) ObjectAddressCollector collector = { 0 }; InitObjectAddressCollector(&collector); - recurse_pg_depend(target, - &ExpandCitusSupportedTypes, - &FollowNewSupportedDependencies, - &ApplyAddToDependencyList, - &collector); + RecurseObjectDependencies(*target, + &ExpandCitusSupportedTypes, + &FollowNewSupportedDependencies, + &ApplyAddToDependencyList, + &collector); return collector.dependencyList; } @@ -143,17 +198,17 @@ OrderObjectAddressListInDependencyOrder(List *objectAddressList) ObjectAddress *objectAddress = NULL; foreach_ptr(objectAddress, objectAddressList) { - if (IsObjectAddressCollected(objectAddress, &collector)) + if (IsObjectAddressCollected(*objectAddress, &collector)) { /* skip objects that are already ordered */ continue; } - recurse_pg_depend(objectAddress, - &ExpandCitusSupportedTypes, - &FollowAllSupportedDependencies, - &ApplyAddToDependencyList, - &collector); + RecurseObjectDependencies(*objectAddress, + &ExpandCitusSupportedTypes, + &FollowAllSupportedDependencies, + &ApplyAddToDependencyList, + &collector); CollectObjectAddress(&collector, objectAddress); } @@ -163,22 +218,20 @@ OrderObjectAddressListInDependencyOrder(List *objectAddressList) /* - * recurse_pg_depend recursively visits pg_depend entries. + * RecurseObjectDependencies recursively visits all dependencies of an object. It sources + * the dependencies from pg_depend and pg_shdepend while 'expanding' the list via an + * optional `expand` function. * - * `expand` allows based on the target ObjectAddress to generate extra entries for ease of - * traversal. - * - * Starting from the target ObjectAddress. For every existing and generated entry the - * `follow` function will be called. When `follow` returns true it will recursively visit - * the dependencies for that object. recurse_pg_depend will visit therefore all pg_depend - * entries. + * Starting from the target ObjectAddress. For every dependency found the `follow` + * function will be called. When `follow` returns true it will recursively visit the + * dependencies for that object. * * Visiting will happen in depth first order, which is useful to create or sorted lists of * dependencies to create. * - * For all pg_depend entries that should be visited the apply function will be called. - * This function is designed to be the mutating function for the context being passed. - * Although nothing prevents the follow function to also mutate the context. + * For all dependencies that should be visited the apply function will be called. This + * function is designed to be the mutating function for the context being passed. Although + * nothing prevents the follow function to also mutate the context. * * - follow will be called on the way down, so the invocation order is top to bottom of * the dependency tree @@ -186,17 +239,9 @@ OrderObjectAddressListInDependencyOrder(List *objectAddressList) * not called for entries for which follow has returned false. */ static void -recurse_pg_depend(const ObjectAddress *target, - List * (*expand)(ObjectAddressCollector *collector, - const ObjectAddress *target), - bool (*follow)(ObjectAddressCollector *collector, Form_pg_depend row), - void (*apply)(ObjectAddressCollector *collector, Form_pg_depend row), - ObjectAddressCollector *collector) +RecurseObjectDependencies(ObjectAddress target, expandFn expand, followFn follow, + applyFn apply, ObjectAddressCollector *collector) { - ScanKeyData key[2]; - HeapTuple depTup = NULL; - List *pgDependEntries = NIL; - if (TargetObjectVisited(collector, target)) { /* prevent infinite loops due to circular dependencies */ @@ -205,51 +250,24 @@ recurse_pg_depend(const ObjectAddress *target, MarkObjectVisited(collector, target); - /* - * iterate the actual pg_depend catalog - */ - Relation depRel = heap_open(DependRelationId, AccessShareLock); + /* lookup both pg_depend and pg_shdepend for dependencies */ + List *pgDependDefinitions = DependencyDefinitionFromPgDepend(target); + List *pgShDependDefinitions = DependencyDefinitionFromPgShDepend(target); + List *dependenyDefinitionList = list_concat(pgDependDefinitions, + pgShDependDefinitions); - /* scan pg_depend for classid = $1 AND objid = $2 using pg_depend_depender_index */ - ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(target->classId)); - ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(target->objectId)); - SysScanDesc depScan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 2, - key); - - while (HeapTupleIsValid(depTup = systable_getnext(depScan))) - { - Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup); - Form_pg_depend pg_depend_copy = palloc0(sizeof(FormData_pg_depend)); - - *pg_depend_copy = *pg_depend; - - pgDependEntries = lappend(pgDependEntries, pg_depend_copy); - } - - systable_endscan(depScan); - relation_close(depRel, AccessShareLock); - - /* - * concat expanded entries if applicable - */ + /* concat expanded entries if applicable */ if (expand != NULL) { List *expandedEntries = expand(collector, target); - pgDependEntries = list_concat(pgDependEntries, expandedEntries); + dependenyDefinitionList = list_concat(dependenyDefinitionList, expandedEntries); } - /* - * Iterate all entries and recurse depth first - */ - Form_pg_depend pg_depend = NULL; - foreach_ptr(pg_depend, pgDependEntries) + /* iterate all entries and recurse depth first */ + DependencyDefinition *dependencyDefinition = NULL; + foreach_ptr(dependencyDefinition, dependenyDefinitionList) { - ObjectAddress address = { 0 }; - ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid); - - if (follow == NULL || !follow(collector, pg_depend)) + if (follow == NULL || !follow(collector, dependencyDefinition)) { /* skip all pg_depend entries the user didn't want to follow */ continue; @@ -259,17 +277,107 @@ recurse_pg_depend(const ObjectAddress *target, * recurse depth first, this makes sure we call apply for the deepest dependency * first. */ - recurse_pg_depend(&address, expand, follow, apply, collector); + ObjectAddress address = DependencyDefinitionObjectAddress(dependencyDefinition); + RecurseObjectDependencies(address, expand, follow, apply, collector); /* now apply changes for current entry */ if (apply != NULL) { - apply(collector, pg_depend); + apply(collector, dependencyDefinition); } } } +/* + * DependencyDefinitionFromPgDepend loads all pg_depend records describing the + * dependencies of target. + */ +static List * +DependencyDefinitionFromPgDepend(ObjectAddress target) +{ + ScanKeyData key[2]; + HeapTuple depTup = NULL; + List *dependenyDefinitionList = NIL; + + /* + * iterate the actual pg_depend catalog + */ + Relation depRel = heap_open(DependRelationId, AccessShareLock); + + /* scan pg_depend for classid = $1 AND objid = $2 using pg_depend_depender_index */ + ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(target.classId)); + ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(target.objectId)); + SysScanDesc depScan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 2, + key); + + while (HeapTupleIsValid(depTup = systable_getnext(depScan))) + { + Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup); + DependencyDefinition *dependency = palloc0(sizeof(DependencyDefinition)); + + /* keep track of all pg_depend records as dependency definitions */ + dependency->mode = DependencyPgDepend; + dependency->data.pg_depend = *pg_depend; + dependenyDefinitionList = lappend(dependenyDefinitionList, dependency); + } + + systable_endscan(depScan); + relation_close(depRel, AccessShareLock); + + return dependenyDefinitionList; +} + + +/* + * DependencyDefinitionFromPgDepend loads all pg_shdepend records describing the + * dependencies of target. + */ +static List * +DependencyDefinitionFromPgShDepend(ObjectAddress target) +{ + ScanKeyData key[3]; + HeapTuple depTup = NULL; + List *dependenyDefinitionList = NIL; + + /* + * iterate the actual pg_shdepend catalog + */ + Relation shdepRel = heap_open(SharedDependRelationId, AccessShareLock); + + /* + * Scan pg_shdepend for dbid = $1 AND classid = $2 AND objid = $3 using + * pg_shdepend_depender_index + */ + ScanKeyInit(&key[0], Anum_pg_shdepend_dbid, BTEqualStrategyNumber, F_OIDEQ, + MyDatabaseId); + ScanKeyInit(&key[1], Anum_pg_shdepend_classid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(target.classId)); + ScanKeyInit(&key[2], Anum_pg_shdepend_objid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(target.objectId)); + SysScanDesc shdepScan = systable_beginscan(shdepRel, SharedDependDependerIndexId, + true, NULL, 3, key); + + while (HeapTupleIsValid(depTup = systable_getnext(shdepScan))) + { + Form_pg_shdepend pg_shdepend = (Form_pg_shdepend) GETSTRUCT(depTup); + DependencyDefinition *dependency = palloc0(sizeof(DependencyDefinition)); + + /* keep track of all pg_shdepend records as dependency definitions */ + dependency->mode = DependencyPgShDepend; + dependency->data.pg_shdepend = *pg_shdepend; + dependenyDefinitionList = lappend(dependenyDefinitionList, dependency); + } + + systable_endscan(shdepScan); + relation_close(shdepRel, AccessShareLock); + + return dependenyDefinitionList; +} + + /* * InitObjectAddressCollector takes a pointer to an already allocated (possibly stack) * ObjectAddressCollector struct. It makes sure this struct is ready to be used for object @@ -301,12 +409,12 @@ InitObjectAddressCollector(ObjectAddressCollector *collector) * traversing pg_depend. */ static bool -TargetObjectVisited(ObjectAddressCollector *collector, const ObjectAddress *target) +TargetObjectVisited(ObjectAddressCollector *collector, ObjectAddress target) { bool found = false; /* find in set */ - hash_search(collector->visitedObjects, target, HASH_FIND, &found); + hash_search(collector->visitedObjects, &target, HASH_FIND, &found); return found; } @@ -317,19 +425,19 @@ TargetObjectVisited(ObjectAddressCollector *collector, const ObjectAddress *targ * pg_depend. */ static void -MarkObjectVisited(ObjectAddressCollector *collector, const ObjectAddress *target) +MarkObjectVisited(ObjectAddressCollector *collector, ObjectAddress target) { bool found = false; /* add to set */ ObjectAddress *address = (ObjectAddress *) hash_search(collector->visitedObjects, - target, + &target, HASH_ENTER, &found); if (!found) { /* copy object address in */ - *address = *target; + *address = target; } } @@ -363,13 +471,13 @@ CollectObjectAddress(ObjectAddressCollector *collector, const ObjectAddress *col * already in a (unsorted) list of ObjectAddresses */ static bool -IsObjectAddressCollected(const ObjectAddress *findAddress, +IsObjectAddressCollected(ObjectAddress findAddress, ObjectAddressCollector *collector) { bool found = false; /* add to set */ - hash_search(collector->dependencySet, findAddress, HASH_FIND, &found); + hash_search(collector->dependencySet, &findAddress, HASH_FIND, &found); return found; } @@ -535,30 +643,35 @@ IsObjectAddressOwnedByExtension(const ObjectAddress *target, * objects which should be distributed before the root object can safely be created. */ static bool -FollowNewSupportedDependencies(ObjectAddressCollector *collector, Form_pg_depend - pg_depend) +FollowNewSupportedDependencies(ObjectAddressCollector *collector, + DependencyDefinition *definition) { - ObjectAddress address = { 0 }; - ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid); - - /* - * Follow only normal and extension dependencies. The latter is used to reach the - * extensions, the objects that directly depend on the extension are eliminated - * during the "apply" phase. - * - * Other dependencies are internal dependencies and managed by postgres. - */ - if (pg_depend->deptype != DEPENDENCY_NORMAL && - pg_depend->deptype != DEPENDENCY_EXTENSION) + if (definition->mode == DependencyPgDepend) { - return false; + /* + * For dependencies found in pg_depend: + * + * Follow only normal and extension dependencies. The latter is used to reach the + * extensions, the objects that directly depend on the extension are eliminated + * during the "apply" phase. + * + * Other dependencies are internal dependencies and managed by postgres. + */ + if (definition->data.pg_depend.deptype != DEPENDENCY_NORMAL && + definition->data.pg_depend.deptype != DEPENDENCY_EXTENSION) + { + return false; + } } + /* rest of the tests are to see if we want to follow the actual dependency */ + ObjectAddress address = DependencyDefinitionObjectAddress(definition); + /* * If the object is already in our dependency list we do not have to follow any * further */ - if (IsObjectAddressCollected(&address, collector)) + if (IsObjectAddressCollected(address, collector)) { return false; } @@ -600,30 +713,35 @@ FollowNewSupportedDependencies(ObjectAddressCollector *collector, Form_pg_depend * This is used to sort a list of dependencies in dependency order. */ static bool -FollowAllSupportedDependencies(ObjectAddressCollector *collector, Form_pg_depend - pg_depend) +FollowAllSupportedDependencies(ObjectAddressCollector *collector, + DependencyDefinition *definition) { - ObjectAddress address = { 0 }; - ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid); - - /* - * Follow only normal and extension dependencies. The latter is used to reach the - * extensions, the objects that directly depend on the extension are eliminated - * during the "apply" phase. - * - * Other dependencies are internal dependencies and managed by postgres. - */ - if (pg_depend->deptype != DEPENDENCY_NORMAL && - pg_depend->deptype != DEPENDENCY_EXTENSION) + if (definition->mode == DependencyPgDepend) { - return false; + /* + * For dependencies found in pg_depend: + * + * Follow only normal and extension dependencies. The latter is used to reach the + * extensions, the objects that directly depend on the extension are eliminated + * during the "apply" phase. + * + * Other dependencies are internal dependencies and managed by postgres. + */ + if (definition->data.pg_depend.deptype != DEPENDENCY_NORMAL && + definition->data.pg_depend.deptype != DEPENDENCY_EXTENSION) + { + return false; + } } + /* rest of the tests are to see if we want to follow the actual dependency */ + ObjectAddress address = DependencyDefinitionObjectAddress(definition); + /* * If the object is already in our dependency list we do not have to follow any * further */ - if (IsObjectAddressCollected(&address, collector)) + if (IsObjectAddressCollected(address, collector)) { return false; } @@ -651,20 +769,23 @@ FollowAllSupportedDependencies(ObjectAddressCollector *collector, Form_pg_depend /* - * ApplyAddToDependencyList is an apply function for recurse_pg_depend that will collect + * ApplyAddToDependencyList is an apply function for RecurseObjectDependencies that will collect * all the ObjectAddresses for pg_depend entries to the context. The context here is * assumed to be a (ObjectAddressCollector *) to the location where all ObjectAddresses * will be collected. */ static void -ApplyAddToDependencyList(ObjectAddressCollector *collector, Form_pg_depend pg_depend) +ApplyAddToDependencyList(ObjectAddressCollector *collector, + DependencyDefinition *definition) { - ObjectAddress address = { 0 }; - ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid); + ObjectAddress address = DependencyDefinitionObjectAddress(definition); /* * Objects owned by an extension are assumed to be created on the workers by creating * the extension in the cluster, we we don't want explicitly create them. + * + * Since we do need to capture the extension as a dependency we are following the + * object instead of breaking the traversal there. */ if (IsObjectAddressOwnedByExtension(&address, NULL)) { @@ -684,11 +805,11 @@ ApplyAddToDependencyList(ObjectAddressCollector *collector, Form_pg_depend pg_de * relation describing the type. */ static List * -ExpandCitusSupportedTypes(ObjectAddressCollector *collector, const ObjectAddress *target) +ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress target) { List *result = NIL; - switch (target->classId) + switch (target.classId) { case TypeRelationId: { @@ -697,19 +818,13 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, const ObjectAddress * are described with their dependencies by the relation that describes the * composite type. */ - if (get_typtype(target->objectId) == TYPTYPE_COMPOSITE) + if (get_typtype(target.objectId) == TYPTYPE_COMPOSITE) { - Form_pg_depend dependency = palloc0(sizeof(FormData_pg_depend)); - dependency->classid = target->classId; - dependency->objid = target->objectId; - dependency->objsubid = target->objectSubId; - - /* add outward edge to the type's relation */ - dependency->refclassid = RelationRelationId; - dependency->refobjid = get_typ_typrelid(target->objectId); - dependency->refobjsubid = 0; - - dependency->deptype = DEPENDENCY_NORMAL; + DependencyDefinition *dependency = palloc0(sizeof(DependencyDefinition)); + dependency->mode = DependencyObjectAddress; + ObjectAddressSet(dependency->data.address, + RelationRelationId, + get_typ_typrelid(target.objectId)); result = lappend(result, dependency); } @@ -720,19 +835,13 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, const ObjectAddress * as that would cause a cyclic dependency on others, instead we expand here * to follow the dependency on the element type. */ - if (type_is_array(target->objectId)) + if (type_is_array(target.objectId)) { - Form_pg_depend dependency = palloc0(sizeof(FormData_pg_depend)); - dependency->classid = target->classId; - dependency->objid = target->objectId; - dependency->objsubid = target->objectSubId; - - /* add outward edge to the element type */ - dependency->refclassid = TypeRelationId; - dependency->refobjid = get_element_type(target->objectId); - dependency->refobjsubid = 0; - - dependency->deptype = DEPENDENCY_NORMAL; + DependencyDefinition *dependency = palloc0(sizeof(DependencyDefinition)); + dependency->mode = DependencyObjectAddress; + ObjectAddressSet(dependency->data.address, + TypeRelationId, + get_element_type(target.objectId)); result = lappend(result, dependency); } @@ -748,3 +857,40 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, const ObjectAddress } return result; } + + +/* + * DependencyDefinitionObjectAddress returns the object address of the dependency defined + * by the dependency definition, irregardless what the source of the definition is + */ +static ObjectAddress +DependencyDefinitionObjectAddress(DependencyDefinition *definition) +{ + switch (definition->mode) + { + case DependencyObjectAddress: + { + return definition->data.address; + } + + case DependencyPgDepend: + { + ObjectAddress address = { 0 }; + ObjectAddressSet(address, + definition->data.pg_depend.refclassid, + definition->data.pg_depend.refobjid); + return address; + } + + case DependencyPgShDepend: + { + ObjectAddress address = { 0 }; + ObjectAddressSet(address, + definition->data.pg_shdepend.refclassid, + definition->data.pg_shdepend.refobjid); + return address; + } + } + + ereport(ERROR, (errmsg("unsupported dependency definition mode"))); +}