mirror of https://github.com/citusdata/citus.git
Make distributed object dependency logic follow upto extensions (#3195)
With this commit, we're slightly changing the dependency traversal logic to enable extension propagation. The main idea is to "follow" the extension dependencies, but do not "apply" them. Since some extension dependencies are base types, and base types could have circular dependencies, we implement a logic to prevent revisiting an already visited object.pull/3184/head
parent
1f46d47f36
commit
a4c90b6ee1
|
@ -30,14 +30,17 @@
|
|||
* ObjectAddressCollector keeps track of collected ObjectAddresses. This can be used
|
||||
* together with recurse_pg_depend.
|
||||
*
|
||||
* We keep two different datastructures for the following reasons
|
||||
* We keep three different datastructures for the following reasons
|
||||
* - A List ordered by insert/collect order
|
||||
* - A Set to quickly O(1) check if an ObjectAddress has already been collected
|
||||
* - A set to check which objects are already visited
|
||||
*/
|
||||
typedef struct ObjectAddressCollector
|
||||
{
|
||||
List *dependencyList;
|
||||
HTAB *dependencySet;
|
||||
|
||||
HTAB *visitedObjects;
|
||||
} ObjectAddressCollector;
|
||||
|
||||
|
||||
|
@ -47,18 +50,28 @@ static void CollectObjectAddress(ObjectAddressCollector *collector, const
|
|||
ObjectAddress *address);
|
||||
static bool IsObjectAddressCollected(const ObjectAddress *findAddress,
|
||||
ObjectAddressCollector *collector);
|
||||
static void MarkObjectVisited(ObjectAddressCollector *collector,
|
||||
const ObjectAddress *target);
|
||||
static bool TargetObjectVisited(ObjectAddressCollector *collector,
|
||||
const ObjectAddress *target);
|
||||
|
||||
/* forward declaration of functions that recurse pg_depend */
|
||||
static void recurse_pg_depend(const ObjectAddress *target,
|
||||
List * (*expand)(void *context, const
|
||||
ObjectAddress *target),
|
||||
bool (*follow)(void *context, Form_pg_depend row),
|
||||
void (*apply)(void *context, Form_pg_depend row),
|
||||
void *context);
|
||||
static bool FollowAllSupportedDependencies(void *context, Form_pg_depend pg_depend);
|
||||
static bool FollowNewSupportedDependencies(void *context, Form_pg_depend pg_depend);
|
||||
static void ApplyAddToDependencyList(void *context, Form_pg_depend pg_depend);
|
||||
static List * ExpandCitusSupportedTypes(void *context, const ObjectAddress *target);
|
||||
List * (*expand)(ObjectAddressCollector *collector,
|
||||
const ObjectAddress *target),
|
||||
bool (*follow)(ObjectAddressCollector *collector,
|
||||
Form_pg_depend row),
|
||||
void (*apply)(ObjectAddressCollector *collector,
|
||||
Form_pg_depend row),
|
||||
ObjectAddressCollector *collector);
|
||||
static bool FollowAllSupportedDependencies(ObjectAddressCollector *collector,
|
||||
Form_pg_depend pg_depend);
|
||||
static bool FollowNewSupportedDependencies(ObjectAddressCollector *collector,
|
||||
Form_pg_depend pg_depend);
|
||||
static void ApplyAddToDependencyList(ObjectAddressCollector *collector,
|
||||
Form_pg_depend pg_depend);
|
||||
static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector,
|
||||
const ObjectAddress *target);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -151,10 +164,11 @@ OrderObjectAddressListInDependencyOrder(List *objectAddressList)
|
|||
*/
|
||||
static void
|
||||
recurse_pg_depend(const ObjectAddress *target,
|
||||
List * (*expand)(void *context, const ObjectAddress *target),
|
||||
bool (*follow)(void *context, Form_pg_depend row),
|
||||
void (*apply)(void *context, Form_pg_depend row),
|
||||
void *context)
|
||||
List * (*expand)(ObjectAddressCollector *collector,
|
||||
const ObjectAddress *target),
|
||||
bool (*follow)(ObjectAddressCollector *collector, Form_pg_depend row),
|
||||
void (*apply)(ObjectAddressCollector *collector, Form_pg_depend row),
|
||||
ObjectAddressCollector *collector)
|
||||
{
|
||||
Relation depRel = NULL;
|
||||
ScanKeyData key[2];
|
||||
|
@ -163,6 +177,14 @@ recurse_pg_depend(const ObjectAddress *target,
|
|||
List *pgDependEntries = NIL;
|
||||
ListCell *pgDependCell = NULL;
|
||||
|
||||
if (TargetObjectVisited(collector, target))
|
||||
{
|
||||
/* prevent infinite loops due to circular dependencies */
|
||||
return;
|
||||
}
|
||||
|
||||
MarkObjectVisited(collector, target);
|
||||
|
||||
/*
|
||||
* iterate the actual pg_depend catalog
|
||||
*/
|
||||
|
@ -195,7 +217,7 @@ recurse_pg_depend(const ObjectAddress *target,
|
|||
{
|
||||
List *expandedEntries = NIL;
|
||||
|
||||
expandedEntries = expand(context, target);
|
||||
expandedEntries = expand(collector, target);
|
||||
pgDependEntries = list_concat(pgDependEntries, expandedEntries);
|
||||
}
|
||||
|
||||
|
@ -208,7 +230,7 @@ recurse_pg_depend(const ObjectAddress *target,
|
|||
ObjectAddress address = { 0 };
|
||||
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
|
||||
|
||||
if (follow == NULL || !follow(context, pg_depend))
|
||||
if (follow == NULL || !follow(collector, pg_depend))
|
||||
{
|
||||
/* skip all pg_depend entries the user didn't want to follow */
|
||||
continue;
|
||||
|
@ -218,12 +240,12 @@ recurse_pg_depend(const ObjectAddress *target,
|
|||
* recurse depth first, this makes sure we call apply for the deepest dependency
|
||||
* first.
|
||||
*/
|
||||
recurse_pg_depend(&address, expand, follow, apply, context);
|
||||
recurse_pg_depend(&address, expand, follow, apply, collector);
|
||||
|
||||
/* now apply changes for current entry */
|
||||
if (apply != NULL)
|
||||
{
|
||||
apply(context, pg_depend);
|
||||
apply(collector, pg_depend);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -251,6 +273,46 @@ InitObjectAddressCollector(ObjectAddressCollector *collector)
|
|||
|
||||
collector->dependencySet = hash_create("dependency set", 128, &info, hashFlags);
|
||||
collector->dependencyList = NULL;
|
||||
|
||||
collector->visitedObjects = hash_create("visited object set", 128, &info, hashFlags);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TargetObjectVisited returns true if the input target has been visited while
|
||||
* traversing pg_depend.
|
||||
*/
|
||||
static bool
|
||||
TargetObjectVisited(ObjectAddressCollector *collector, const ObjectAddress *target)
|
||||
{
|
||||
bool found = false;
|
||||
|
||||
/* find in set */
|
||||
hash_search(collector->visitedObjects, target, HASH_FIND, &found);
|
||||
|
||||
return found;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkObjectVisited marks the object as visited during the traversal of
|
||||
* pg_depend.
|
||||
*/
|
||||
static void
|
||||
MarkObjectVisited(ObjectAddressCollector *collector, const ObjectAddress *target)
|
||||
{
|
||||
ObjectAddress *address = NULL;
|
||||
bool found = false;
|
||||
|
||||
/* add to set */
|
||||
address = (ObjectAddress *) hash_search(collector->visitedObjects, target,
|
||||
HASH_ENTER, &found);
|
||||
|
||||
if (!found)
|
||||
{
|
||||
/* copy object address in */
|
||||
*address = *target;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -450,25 +512,21 @@ IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
|||
* objects which should be distributed before the root object can safely be created.
|
||||
*/
|
||||
static bool
|
||||
FollowNewSupportedDependencies(void *context, Form_pg_depend pg_depend)
|
||||
FollowNewSupportedDependencies(ObjectAddressCollector *collector, Form_pg_depend
|
||||
pg_depend)
|
||||
{
|
||||
ObjectAddressCollector *collector = (ObjectAddressCollector *) context;
|
||||
ObjectAddress address = { 0 };
|
||||
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
|
||||
|
||||
/*
|
||||
* Distirbute only normal dependencies, other dependencies are internal dependencies
|
||||
* and managed by postgres
|
||||
* Follow only normal and extension dependencies. The latter is used to reach the
|
||||
* extensions, the objects that directly depend on the extension are eliminated
|
||||
* during the "apply" phase.
|
||||
*
|
||||
* Other dependencies are internal dependencies and managed by postgres.
|
||||
*/
|
||||
if (pg_depend->deptype != DEPENDENCY_NORMAL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* We can only distribute dependencies that citus knows how to distribute
|
||||
*/
|
||||
if (!SupportedDependencyByCitus(&address))
|
||||
if (pg_depend->deptype != DEPENDENCY_NORMAL &&
|
||||
pg_depend->deptype != DEPENDENCY_EXTENSION)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -492,10 +550,13 @@ FollowNewSupportedDependencies(void *context, Form_pg_depend pg_depend)
|
|||
}
|
||||
|
||||
/*
|
||||
* Objects owned by an extension are assumed to be created on the workers by creating
|
||||
* the extension in the cluster
|
||||
* We can only distribute dependencies that citus knows how to distribute.
|
||||
*
|
||||
* But we don't want to bail out if the object is owned by extension, because
|
||||
* Citus can create the extension.
|
||||
*/
|
||||
if (IsObjectAddressOwnedByExtension(&address, NULL))
|
||||
if (!SupportedDependencyByCitus(&address) &&
|
||||
!IsObjectAddressOwnedByExtension(&address, NULL))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -510,25 +571,21 @@ FollowNewSupportedDependencies(void *context, Form_pg_depend pg_depend)
|
|||
* This is used to sort a list of dependencies in dependency order.
|
||||
*/
|
||||
static bool
|
||||
FollowAllSupportedDependencies(void *context, Form_pg_depend pg_depend)
|
||||
FollowAllSupportedDependencies(ObjectAddressCollector *collector, Form_pg_depend
|
||||
pg_depend)
|
||||
{
|
||||
ObjectAddressCollector *collector = (ObjectAddressCollector *) context;
|
||||
ObjectAddress address = { 0 };
|
||||
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
|
||||
|
||||
/*
|
||||
* Distirbute only normal dependencies, other dependencies are internal dependencies
|
||||
* and managed by postgres
|
||||
* Follow only normal and extension dependencies. The latter is used to reach the
|
||||
* extensions, the objects that directly depend on the extension are eliminated
|
||||
* during the "apply" phase.
|
||||
*
|
||||
* Other dependencies are internal dependencies and managed by postgres.
|
||||
*/
|
||||
if (pg_depend->deptype != DEPENDENCY_NORMAL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* We can only distribute dependencies that citus knows how to distribute
|
||||
*/
|
||||
if (!SupportedDependencyByCitus(&address))
|
||||
if (pg_depend->deptype != DEPENDENCY_NORMAL &&
|
||||
pg_depend->deptype != DEPENDENCY_EXTENSION)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -543,10 +600,13 @@ FollowAllSupportedDependencies(void *context, Form_pg_depend pg_depend)
|
|||
}
|
||||
|
||||
/*
|
||||
* Objects owned by an extension are assumed to be created on the workers by creating
|
||||
* the extension in the cluster
|
||||
* We can only distribute dependencies that citus knows how to distribute.
|
||||
*
|
||||
* But we don't want to bail out if the object is owned by extension, because
|
||||
* Citus can create the extension.
|
||||
*/
|
||||
if (IsObjectAddressOwnedByExtension(&address, NULL))
|
||||
if (!SupportedDependencyByCitus(&address) &&
|
||||
!IsObjectAddressOwnedByExtension(&address, NULL))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -562,12 +622,20 @@ FollowAllSupportedDependencies(void *context, Form_pg_depend pg_depend)
|
|||
* will be collected.
|
||||
*/
|
||||
static void
|
||||
ApplyAddToDependencyList(void *context, Form_pg_depend pg_depend)
|
||||
ApplyAddToDependencyList(ObjectAddressCollector *collector, Form_pg_depend pg_depend)
|
||||
{
|
||||
ObjectAddressCollector *collector = (ObjectAddressCollector *) context;
|
||||
ObjectAddress address = { 0 };
|
||||
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
|
||||
|
||||
/*
|
||||
* Objects owned by an extension are assumed to be created on the workers by creating
|
||||
* the extension in the cluster, we we don't want explicitly create them.
|
||||
*/
|
||||
if (IsObjectAddressOwnedByExtension(&address, NULL))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
CollectObjectAddress(collector, &address);
|
||||
}
|
||||
|
||||
|
@ -581,7 +649,7 @@ ApplyAddToDependencyList(void *context, Form_pg_depend pg_depend)
|
|||
* relation describing the type.
|
||||
*/
|
||||
static List *
|
||||
ExpandCitusSupportedTypes(void *context, const ObjectAddress *target)
|
||||
ExpandCitusSupportedTypes(ObjectAddressCollector *collector, const ObjectAddress *target)
|
||||
{
|
||||
List *result = NIL;
|
||||
|
||||
|
|
Loading…
Reference in New Issue