Refactor dependency resolution and resolve from pg_shdepend (#3633)

DESCRIPTION: Refactor dependency resolution and resolve from pg_shdepend

This PR refactors how dependencies are resolved by not assuming solely a `pg_depend` record describing the dependency. Instead we keep a definition of the dependency around which records how the dependency is resolved. This can be one of the following ways
 - `pg_depend`, data will contain a copy of the `pg_depend` record
 - `pg_shdepend`, data will contain a copy of the `pg_shdepend` record
 - `ObjectAddress`, data will contain only an `ObjectAddress` describing a dependency

Irregardless of way the dependency was found it will always be able to get to the address of the dependency as that is the most important property.

For some checks we can inspect the source where the dependency was found and perform a deep inspection to decide if we want to follow the dependency. This is important to not distribute dependencies coming from extensions for example.
pull/3648/head^2
Nils Dijk 2020-03-25 13:38:25 +01:00 committed by GitHub
parent eaaf302795
commit 4e611cfc25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 304 additions and 158 deletions

View File

@ -18,18 +18,20 @@
#include "catalog/indexing.h"
#include "catalog/pg_class.h"
#include "catalog/pg_depend.h"
#include "catalog/pg_shdepend.h"
#include "catalog/pg_type.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/listutils.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "miscadmin.h"
#include "utils/fmgroids.h"
#include "utils/hsearch.h"
#include "utils/lsyscache.h"
/*
* ObjectAddressCollector keeps track of collected ObjectAddresses. This can be used
* together with recurse_pg_depend.
* together with RecurseObjectDependencies.
*
* We keep three different datastructures for the following reasons
* - A List ordered by insert/collect order
@ -44,35 +46,88 @@ typedef struct ObjectAddressCollector
HTAB *visitedObjects;
} ObjectAddressCollector;
/*
* DependencyMode distinguishes the data stored in DependencyDefinition. For details see
* DependencyDefinition's inline comments in the data union.
*/
typedef enum DependencyMode
{
DependencyObjectAddress,
DependencyPgDepend,
DependencyPgShDepend
} DependencyMode;
typedef struct DependencyDefinition
{
/* describe how the dependency data is stored in the data field */
DependencyMode mode;
/*
* Dependencies can be found in different ways and therefore stored differently on the
* definition.
*/
union
{
/*
* pg_depend is used for dependencies found in the database local pg_depend table.
* The entry is copied while scanning the table. The record can be inspected
* during the chasing algorithm to follow dependencies of different classes, or
* based on dependency type.
*/
FormData_pg_depend pg_depend;
/*
* pg_shdepend is used for dependencies found in the global pg_shdepend table.
* The entry is copied while scanning the table. The record can be inspected
* during the chasing algorithm to follow dependencies of different classes, or
* based on dependency type.
*/
FormData_pg_shdepend pg_shdepend;
/*
* address is used for dependencies that are artificially added during the
* chasing. Since they are added by citus code we assume the dependency needs to
* be chased anyway, ofcourse it will only actually be chased if the object is a
* suppported object by citus
*/
ObjectAddress address;
} data;
} DependencyDefinition;
static ObjectAddress DependencyDefinitionObjectAddress(DependencyDefinition *definition);
/* forward declarations for functions to interact with the ObjectAddressCollector */
static void InitObjectAddressCollector(ObjectAddressCollector *collector);
static void CollectObjectAddress(ObjectAddressCollector *collector,
const ObjectAddress *address);
static bool IsObjectAddressCollected(const ObjectAddress *findAddress,
static bool IsObjectAddressCollected(ObjectAddress findAddress,
ObjectAddressCollector *collector);
static void MarkObjectVisited(ObjectAddressCollector *collector,
const ObjectAddress *target);
ObjectAddress target);
static bool TargetObjectVisited(ObjectAddressCollector *collector,
const ObjectAddress *target);
ObjectAddress target);
typedef List *(*expandFn)(ObjectAddressCollector *collector, ObjectAddress target);
typedef bool (*followFn)(ObjectAddressCollector *collector,
DependencyDefinition *definition);
typedef void (*applyFn)(ObjectAddressCollector *collector,
DependencyDefinition *definition);
/* forward declaration of functions that recurse pg_depend */
static void recurse_pg_depend(const ObjectAddress *target,
List * (*expand)(ObjectAddressCollector *collector,
const ObjectAddress *target),
bool (*follow)(ObjectAddressCollector *collector,
Form_pg_depend row),
void (*apply)(ObjectAddressCollector *collector,
Form_pg_depend row),
static void RecurseObjectDependencies(ObjectAddress target, expandFn expand,
followFn follow, applyFn apply,
ObjectAddressCollector *collector);
static List * DependencyDefinitionFromPgDepend(ObjectAddress target);
static List * DependencyDefinitionFromPgShDepend(ObjectAddress target);
static bool FollowAllSupportedDependencies(ObjectAddressCollector *collector,
Form_pg_depend pg_depend);
DependencyDefinition *definition);
static bool FollowNewSupportedDependencies(ObjectAddressCollector *collector,
Form_pg_depend pg_depend);
DependencyDefinition *definition);
static void ApplyAddToDependencyList(ObjectAddressCollector *collector,
Form_pg_depend pg_depend);
DependencyDefinition *definition);
static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector,
const ObjectAddress *target);
ObjectAddress target);
/*
@ -88,7 +143,7 @@ GetUniqueDependenciesList(List *objectAddressesList)
ObjectAddress *objectAddress = NULL;
foreach_ptr(objectAddress, objectAddressesList)
{
if (IsObjectAddressCollected(objectAddress, &objectAddressCollector))
if (IsObjectAddressCollected(*objectAddress, &objectAddressCollector))
{
/* skip objects that are already collected */
continue;
@ -112,7 +167,7 @@ GetDependenciesForObject(const ObjectAddress *target)
ObjectAddressCollector collector = { 0 };
InitObjectAddressCollector(&collector);
recurse_pg_depend(target,
RecurseObjectDependencies(*target,
&ExpandCitusSupportedTypes,
&FollowNewSupportedDependencies,
&ApplyAddToDependencyList,
@ -143,13 +198,13 @@ OrderObjectAddressListInDependencyOrder(List *objectAddressList)
ObjectAddress *objectAddress = NULL;
foreach_ptr(objectAddress, objectAddressList)
{
if (IsObjectAddressCollected(objectAddress, &collector))
if (IsObjectAddressCollected(*objectAddress, &collector))
{
/* skip objects that are already ordered */
continue;
}
recurse_pg_depend(objectAddress,
RecurseObjectDependencies(*objectAddress,
&ExpandCitusSupportedTypes,
&FollowAllSupportedDependencies,
&ApplyAddToDependencyList,
@ -163,22 +218,20 @@ OrderObjectAddressListInDependencyOrder(List *objectAddressList)
/*
* recurse_pg_depend recursively visits pg_depend entries.
* RecurseObjectDependencies recursively visits all dependencies of an object. It sources
* the dependencies from pg_depend and pg_shdepend while 'expanding' the list via an
* optional `expand` function.
*
* `expand` allows based on the target ObjectAddress to generate extra entries for ease of
* traversal.
*
* Starting from the target ObjectAddress. For every existing and generated entry the
* `follow` function will be called. When `follow` returns true it will recursively visit
* the dependencies for that object. recurse_pg_depend will visit therefore all pg_depend
* entries.
* Starting from the target ObjectAddress. For every dependency found the `follow`
* function will be called. When `follow` returns true it will recursively visit the
* dependencies for that object.
*
* Visiting will happen in depth first order, which is useful to create or sorted lists of
* dependencies to create.
*
* For all pg_depend entries that should be visited the apply function will be called.
* This function is designed to be the mutating function for the context being passed.
* Although nothing prevents the follow function to also mutate the context.
* For all dependencies that should be visited the apply function will be called. This
* function is designed to be the mutating function for the context being passed. Although
* nothing prevents the follow function to also mutate the context.
*
* - follow will be called on the way down, so the invocation order is top to bottom of
* the dependency tree
@ -186,17 +239,9 @@ OrderObjectAddressListInDependencyOrder(List *objectAddressList)
* not called for entries for which follow has returned false.
*/
static void
recurse_pg_depend(const ObjectAddress *target,
List * (*expand)(ObjectAddressCollector *collector,
const ObjectAddress *target),
bool (*follow)(ObjectAddressCollector *collector, Form_pg_depend row),
void (*apply)(ObjectAddressCollector *collector, Form_pg_depend row),
ObjectAddressCollector *collector)
RecurseObjectDependencies(ObjectAddress target, expandFn expand, followFn follow,
applyFn apply, ObjectAddressCollector *collector)
{
ScanKeyData key[2];
HeapTuple depTup = NULL;
List *pgDependEntries = NIL;
if (TargetObjectVisited(collector, target))
{
/* prevent infinite loops due to circular dependencies */
@ -205,51 +250,24 @@ recurse_pg_depend(const ObjectAddress *target,
MarkObjectVisited(collector, target);
/*
* iterate the actual pg_depend catalog
*/
Relation depRel = heap_open(DependRelationId, AccessShareLock);
/* lookup both pg_depend and pg_shdepend for dependencies */
List *pgDependDefinitions = DependencyDefinitionFromPgDepend(target);
List *pgShDependDefinitions = DependencyDefinitionFromPgShDepend(target);
List *dependenyDefinitionList = list_concat(pgDependDefinitions,
pgShDependDefinitions);
/* scan pg_depend for classid = $1 AND objid = $2 using pg_depend_depender_index */
ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(target->classId));
ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(target->objectId));
SysScanDesc depScan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 2,
key);
while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
{
Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
Form_pg_depend pg_depend_copy = palloc0(sizeof(FormData_pg_depend));
*pg_depend_copy = *pg_depend;
pgDependEntries = lappend(pgDependEntries, pg_depend_copy);
}
systable_endscan(depScan);
relation_close(depRel, AccessShareLock);
/*
* concat expanded entries if applicable
*/
/* concat expanded entries if applicable */
if (expand != NULL)
{
List *expandedEntries = expand(collector, target);
pgDependEntries = list_concat(pgDependEntries, expandedEntries);
dependenyDefinitionList = list_concat(dependenyDefinitionList, expandedEntries);
}
/*
* Iterate all entries and recurse depth first
*/
Form_pg_depend pg_depend = NULL;
foreach_ptr(pg_depend, pgDependEntries)
/* iterate all entries and recurse depth first */
DependencyDefinition *dependencyDefinition = NULL;
foreach_ptr(dependencyDefinition, dependenyDefinitionList)
{
ObjectAddress address = { 0 };
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
if (follow == NULL || !follow(collector, pg_depend))
if (follow == NULL || !follow(collector, dependencyDefinition))
{
/* skip all pg_depend entries the user didn't want to follow */
continue;
@ -259,17 +277,107 @@ recurse_pg_depend(const ObjectAddress *target,
* recurse depth first, this makes sure we call apply for the deepest dependency
* first.
*/
recurse_pg_depend(&address, expand, follow, apply, collector);
ObjectAddress address = DependencyDefinitionObjectAddress(dependencyDefinition);
RecurseObjectDependencies(address, expand, follow, apply, collector);
/* now apply changes for current entry */
if (apply != NULL)
{
apply(collector, pg_depend);
apply(collector, dependencyDefinition);
}
}
}
/*
* DependencyDefinitionFromPgDepend loads all pg_depend records describing the
* dependencies of target.
*/
static List *
DependencyDefinitionFromPgDepend(ObjectAddress target)
{
ScanKeyData key[2];
HeapTuple depTup = NULL;
List *dependenyDefinitionList = NIL;
/*
* iterate the actual pg_depend catalog
*/
Relation depRel = heap_open(DependRelationId, AccessShareLock);
/* scan pg_depend for classid = $1 AND objid = $2 using pg_depend_depender_index */
ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(target.classId));
ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(target.objectId));
SysScanDesc depScan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 2,
key);
while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
{
Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
DependencyDefinition *dependency = palloc0(sizeof(DependencyDefinition));
/* keep track of all pg_depend records as dependency definitions */
dependency->mode = DependencyPgDepend;
dependency->data.pg_depend = *pg_depend;
dependenyDefinitionList = lappend(dependenyDefinitionList, dependency);
}
systable_endscan(depScan);
relation_close(depRel, AccessShareLock);
return dependenyDefinitionList;
}
/*
* DependencyDefinitionFromPgDepend loads all pg_shdepend records describing the
* dependencies of target.
*/
static List *
DependencyDefinitionFromPgShDepend(ObjectAddress target)
{
ScanKeyData key[3];
HeapTuple depTup = NULL;
List *dependenyDefinitionList = NIL;
/*
* iterate the actual pg_shdepend catalog
*/
Relation shdepRel = heap_open(SharedDependRelationId, AccessShareLock);
/*
* Scan pg_shdepend for dbid = $1 AND classid = $2 AND objid = $3 using
* pg_shdepend_depender_index
*/
ScanKeyInit(&key[0], Anum_pg_shdepend_dbid, BTEqualStrategyNumber, F_OIDEQ,
MyDatabaseId);
ScanKeyInit(&key[1], Anum_pg_shdepend_classid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(target.classId));
ScanKeyInit(&key[2], Anum_pg_shdepend_objid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(target.objectId));
SysScanDesc shdepScan = systable_beginscan(shdepRel, SharedDependDependerIndexId,
true, NULL, 3, key);
while (HeapTupleIsValid(depTup = systable_getnext(shdepScan)))
{
Form_pg_shdepend pg_shdepend = (Form_pg_shdepend) GETSTRUCT(depTup);
DependencyDefinition *dependency = palloc0(sizeof(DependencyDefinition));
/* keep track of all pg_shdepend records as dependency definitions */
dependency->mode = DependencyPgShDepend;
dependency->data.pg_shdepend = *pg_shdepend;
dependenyDefinitionList = lappend(dependenyDefinitionList, dependency);
}
systable_endscan(shdepScan);
relation_close(shdepRel, AccessShareLock);
return dependenyDefinitionList;
}
/*
* InitObjectAddressCollector takes a pointer to an already allocated (possibly stack)
* ObjectAddressCollector struct. It makes sure this struct is ready to be used for object
@ -301,12 +409,12 @@ InitObjectAddressCollector(ObjectAddressCollector *collector)
* traversing pg_depend.
*/
static bool
TargetObjectVisited(ObjectAddressCollector *collector, const ObjectAddress *target)
TargetObjectVisited(ObjectAddressCollector *collector, ObjectAddress target)
{
bool found = false;
/* find in set */
hash_search(collector->visitedObjects, target, HASH_FIND, &found);
hash_search(collector->visitedObjects, &target, HASH_FIND, &found);
return found;
}
@ -317,19 +425,19 @@ TargetObjectVisited(ObjectAddressCollector *collector, const ObjectAddress *targ
* pg_depend.
*/
static void
MarkObjectVisited(ObjectAddressCollector *collector, const ObjectAddress *target)
MarkObjectVisited(ObjectAddressCollector *collector, ObjectAddress target)
{
bool found = false;
/* add to set */
ObjectAddress *address = (ObjectAddress *) hash_search(collector->visitedObjects,
target,
&target,
HASH_ENTER, &found);
if (!found)
{
/* copy object address in */
*address = *target;
*address = target;
}
}
@ -363,13 +471,13 @@ CollectObjectAddress(ObjectAddressCollector *collector, const ObjectAddress *col
* already in a (unsorted) list of ObjectAddresses
*/
static bool
IsObjectAddressCollected(const ObjectAddress *findAddress,
IsObjectAddressCollected(ObjectAddress findAddress,
ObjectAddressCollector *collector)
{
bool found = false;
/* add to set */
hash_search(collector->dependencySet, findAddress, HASH_FIND, &found);
hash_search(collector->dependencySet, &findAddress, HASH_FIND, &found);
return found;
}
@ -535,30 +643,35 @@ IsObjectAddressOwnedByExtension(const ObjectAddress *target,
* objects which should be distributed before the root object can safely be created.
*/
static bool
FollowNewSupportedDependencies(ObjectAddressCollector *collector, Form_pg_depend
pg_depend)
FollowNewSupportedDependencies(ObjectAddressCollector *collector,
DependencyDefinition *definition)
{
ObjectAddress address = { 0 };
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
if (definition->mode == DependencyPgDepend)
{
/*
* For dependencies found in pg_depend:
*
* Follow only normal and extension dependencies. The latter is used to reach the
* extensions, the objects that directly depend on the extension are eliminated
* during the "apply" phase.
*
* Other dependencies are internal dependencies and managed by postgres.
*/
if (pg_depend->deptype != DEPENDENCY_NORMAL &&
pg_depend->deptype != DEPENDENCY_EXTENSION)
if (definition->data.pg_depend.deptype != DEPENDENCY_NORMAL &&
definition->data.pg_depend.deptype != DEPENDENCY_EXTENSION)
{
return false;
}
}
/* rest of the tests are to see if we want to follow the actual dependency */
ObjectAddress address = DependencyDefinitionObjectAddress(definition);
/*
* If the object is already in our dependency list we do not have to follow any
* further
*/
if (IsObjectAddressCollected(&address, collector))
if (IsObjectAddressCollected(address, collector))
{
return false;
}
@ -600,30 +713,35 @@ FollowNewSupportedDependencies(ObjectAddressCollector *collector, Form_pg_depend
* This is used to sort a list of dependencies in dependency order.
*/
static bool
FollowAllSupportedDependencies(ObjectAddressCollector *collector, Form_pg_depend
pg_depend)
FollowAllSupportedDependencies(ObjectAddressCollector *collector,
DependencyDefinition *definition)
{
ObjectAddress address = { 0 };
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
if (definition->mode == DependencyPgDepend)
{
/*
* For dependencies found in pg_depend:
*
* Follow only normal and extension dependencies. The latter is used to reach the
* extensions, the objects that directly depend on the extension are eliminated
* during the "apply" phase.
*
* Other dependencies are internal dependencies and managed by postgres.
*/
if (pg_depend->deptype != DEPENDENCY_NORMAL &&
pg_depend->deptype != DEPENDENCY_EXTENSION)
if (definition->data.pg_depend.deptype != DEPENDENCY_NORMAL &&
definition->data.pg_depend.deptype != DEPENDENCY_EXTENSION)
{
return false;
}
}
/* rest of the tests are to see if we want to follow the actual dependency */
ObjectAddress address = DependencyDefinitionObjectAddress(definition);
/*
* If the object is already in our dependency list we do not have to follow any
* further
*/
if (IsObjectAddressCollected(&address, collector))
if (IsObjectAddressCollected(address, collector))
{
return false;
}
@ -651,20 +769,23 @@ FollowAllSupportedDependencies(ObjectAddressCollector *collector, Form_pg_depend
/*
* ApplyAddToDependencyList is an apply function for recurse_pg_depend that will collect
* ApplyAddToDependencyList is an apply function for RecurseObjectDependencies that will collect
* all the ObjectAddresses for pg_depend entries to the context. The context here is
* assumed to be a (ObjectAddressCollector *) to the location where all ObjectAddresses
* will be collected.
*/
static void
ApplyAddToDependencyList(ObjectAddressCollector *collector, Form_pg_depend pg_depend)
ApplyAddToDependencyList(ObjectAddressCollector *collector,
DependencyDefinition *definition)
{
ObjectAddress address = { 0 };
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
ObjectAddress address = DependencyDefinitionObjectAddress(definition);
/*
* Objects owned by an extension are assumed to be created on the workers by creating
* the extension in the cluster, we we don't want explicitly create them.
*
* Since we do need to capture the extension as a dependency we are following the
* object instead of breaking the traversal there.
*/
if (IsObjectAddressOwnedByExtension(&address, NULL))
{
@ -684,11 +805,11 @@ ApplyAddToDependencyList(ObjectAddressCollector *collector, Form_pg_depend pg_de
* relation describing the type.
*/
static List *
ExpandCitusSupportedTypes(ObjectAddressCollector *collector, const ObjectAddress *target)
ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress target)
{
List *result = NIL;
switch (target->classId)
switch (target.classId)
{
case TypeRelationId:
{
@ -697,19 +818,13 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, const ObjectAddress
* are described with their dependencies by the relation that describes the
* composite type.
*/
if (get_typtype(target->objectId) == TYPTYPE_COMPOSITE)
if (get_typtype(target.objectId) == TYPTYPE_COMPOSITE)
{
Form_pg_depend dependency = palloc0(sizeof(FormData_pg_depend));
dependency->classid = target->classId;
dependency->objid = target->objectId;
dependency->objsubid = target->objectSubId;
/* add outward edge to the type's relation */
dependency->refclassid = RelationRelationId;
dependency->refobjid = get_typ_typrelid(target->objectId);
dependency->refobjsubid = 0;
dependency->deptype = DEPENDENCY_NORMAL;
DependencyDefinition *dependency = palloc0(sizeof(DependencyDefinition));
dependency->mode = DependencyObjectAddress;
ObjectAddressSet(dependency->data.address,
RelationRelationId,
get_typ_typrelid(target.objectId));
result = lappend(result, dependency);
}
@ -720,19 +835,13 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, const ObjectAddress
* as that would cause a cyclic dependency on others, instead we expand here
* to follow the dependency on the element type.
*/
if (type_is_array(target->objectId))
if (type_is_array(target.objectId))
{
Form_pg_depend dependency = palloc0(sizeof(FormData_pg_depend));
dependency->classid = target->classId;
dependency->objid = target->objectId;
dependency->objsubid = target->objectSubId;
/* add outward edge to the element type */
dependency->refclassid = TypeRelationId;
dependency->refobjid = get_element_type(target->objectId);
dependency->refobjsubid = 0;
dependency->deptype = DEPENDENCY_NORMAL;
DependencyDefinition *dependency = palloc0(sizeof(DependencyDefinition));
dependency->mode = DependencyObjectAddress;
ObjectAddressSet(dependency->data.address,
TypeRelationId,
get_element_type(target.objectId));
result = lappend(result, dependency);
}
@ -748,3 +857,40 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, const ObjectAddress
}
return result;
}
/*
* DependencyDefinitionObjectAddress returns the object address of the dependency defined
* by the dependency definition, irregardless what the source of the definition is
*/
static ObjectAddress
DependencyDefinitionObjectAddress(DependencyDefinition *definition)
{
switch (definition->mode)
{
case DependencyObjectAddress:
{
return definition->data.address;
}
case DependencyPgDepend:
{
ObjectAddress address = { 0 };
ObjectAddressSet(address,
definition->data.pg_depend.refclassid,
definition->data.pg_depend.refobjid);
return address;
}
case DependencyPgShDepend:
{
ObjectAddress address = { 0 };
ObjectAddressSet(address,
definition->data.pg_shdepend.refclassid,
definition->data.pg_shdepend.refobjid);
return address;
}
}
ereport(ERROR, (errmsg("unsupported dependency definition mode")));
}