From 480175d49e89dd7ec38d3f51f9ba922fad3c9b0c Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 23 Nov 2023 16:43:08 +0300 Subject: [PATCH] Refactor EnsureDependenciesExistOnAllNodes to introduce EnsureObjectExistOnAllNodes --- .../distributed/commands/dependencies.c | 130 ++++++++++++++---- .../transaction/transaction_management.c | 13 +- src/include/distributed/metadata_utility.h | 1 + .../distributed/transaction_management.h | 2 +- 4 files changed, 113 insertions(+), 33 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index e309ee86c..430726814 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -29,20 +29,65 @@ #include "storage/lmgr.h" #include "utils/lsyscache.h" +typedef enum RequiredObjectSet +{ + REQUIRE_ONLY_DEPENDENCIES = 1, + REQUIRE_OBJECT_AND_DEPENDENCIES = 2, +} RequiredObjectSet; + static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress); static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress); static int ObjectAddressComparator(const void *a, const void *b); static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); +static void EnsureRequiredObjectExistOnAllNodes(const ObjectAddress *target, + RequiredObjectSet requiredObjectSet); static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency); static bool ShouldPropagateObject(const ObjectAddress *address); static char * DropTableIfExistsCommand(Oid relationId); /* - * EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes - * sure these are available on all nodes. If not available they will be created on the - * nodes via a separate session that will be committed directly so that the objects are - * visible to potentially multiple sessions creating the shards. + * EnsureObjectExistOnAllNodes is a wrapper around + * EnsureRequiredObjectExistOnAllNodes to ensure the "object itself" (together + * with its dependencies) is available on all nodes. + * + * See EnsureRequiredObjectExistOnAllNodes to learn more about how this + * function deals with an object created within the same transaction. + */ +void +EnsureObjectExistOnAllNodes(const ObjectAddress *target) +{ + EnsureRequiredObjectExistOnAllNodes(target, REQUIRE_OBJECT_AND_DEPENDENCIES); +} + + +/* + * EnsureDependenciesExistOnAllNodes is a wrapper around + * EnsureRequiredObjectExistOnAllNodes to ensure "all dependencies" of given + * object --but not the object itself-- are available on all nodes. + * + * See EnsureRequiredObjectExistOnAllNodes to learn more about how this + * function deals with an object created within the same transaction. + */ +static void +EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) +{ + EnsureRequiredObjectExistOnAllNodes(target, REQUIRE_ONLY_DEPENDENCIES); +} + + +/* + * EnsureRequiredObjectExistOnAllNodes finds all the dependencies that we support and makes + * sure these are available on all nodes if required object set is REQUIRE_ONLY_DEPENDENCIES. + * Otherwise, i.e., if required object set is REQUIRE_OBJECT_AND_DEPENDENCIES, then this + * function creates the object itself on all nodes too. This function ensures that each + * of the dependencies are supported by Citus but doesn't check the same for the target + * object itself (when REQUIRE_OBJECT_AND_DEPENDENCIES) is provided because we assume that + * callers don't call this function for an unsupported function at all. + * + * If not available, they will be created on the nodes via a separate session that will be + * committed directly so that the objects are visible to potentially multiple sessions creating + * the shards. * * Note; only the actual objects are created via a separate session, the records to * pg_dist_object are created in this session. As a side effect the objects could be @@ -53,29 +98,51 @@ static char * DropTableIfExistsCommand(Oid relationId); * postgres native CREATE IF NOT EXISTS, or citus helper functions. */ static void -EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) +EnsureRequiredObjectExistOnAllNodes(const ObjectAddress *target, + RequiredObjectSet requiredObjectSet) { - List *dependenciesWithCommands = NIL; + Assert(requiredObjectSet == REQUIRE_ONLY_DEPENDENCIES || + requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES); + + List *objectsWithCommands = NIL; List *ddlCommands = NULL; /* * If there is any unsupported dependency or circular dependency exists, Citus can * not ensure dependencies will exist on all nodes. + * + * Note that we don't check whether "target" is distributable (in case + * REQUIRE_OBJECT_AND_DEPENDENCIES is provided) because we expect callers + * to not even call this function if Citus doesn't know how to propagate + * "target" object itself. */ EnsureDependenciesCanBeDistributed(target); /* collect all dependencies in creation order and get their ddl commands */ - List *dependencies = GetDependenciesForObject(target); - ObjectAddress *dependency = NULL; - foreach_ptr(dependency, dependencies) + List *objectsToBeCreated = GetDependenciesForObject(target); + + /* + * Append the target object to make sure that it's created after its + * dependencies are created, if requested. + */ + if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES) { - List *dependencyCommands = GetDependencyCreateDDLCommands(dependency); + ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress)); + *targetCopy = *target; + + objectsToBeCreated = lappend(objectsToBeCreated, targetCopy); + } + + ObjectAddress *object = NULL; + foreach_ptr(object, objectsToBeCreated) + { + List *dependencyCommands = GetDependencyCreateDDLCommands(object); ddlCommands = list_concat(ddlCommands, dependencyCommands); - /* create a new list with dependencies that actually created commands */ + /* create a new list with objects that actually created commands */ if (list_length(dependencyCommands) > 0) { - dependenciesWithCommands = lappend(dependenciesWithCommands, dependency); + objectsWithCommands = lappend(objectsWithCommands, object); } } if (list_length(ddlCommands) <= 0) @@ -98,26 +165,28 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) List *remoteNodeList = ActivePrimaryRemoteNodeList(RowShareLock); /* - * Lock dependent objects explicitly to make sure same DDL command won't be sent + * Lock objects to be created explicitly to make sure same DDL command won't be sent * multiple times from parallel sessions. * - * Sort dependencies that will be created on workers to not to have any deadlock + * Sort the objects that will be created on workers to not to have any deadlock * issue if different sessions are creating different objects. */ - List *addressSortedDependencies = SortList(dependenciesWithCommands, + List *addressSortedDependencies = SortList(objectsWithCommands, ObjectAddressComparator); - foreach_ptr(dependency, addressSortedDependencies) + foreach_ptr(object, addressSortedDependencies) { - LockDatabaseObject(dependency->classId, dependency->objectId, - dependency->objectSubId, ExclusiveLock); + LockDatabaseObject(object->classId, object->objectId, + object->objectSubId, ExclusiveLock); } /* - * We need to propagate dependencies via the current user's metadata connection if - * any dependency for the target is created in the current transaction. Our assumption - * is that if we rely on a dependency created in the current transaction, then the - * current user, most probably, has permissions to create the target object as well. + * We need to propagate objects via the current user's metadata connection if + * any of the objects that we're interested in are created in the current transaction. + * Our assumption is that if we rely on an object created in the current transaction, + * then the current user, most probably, has permissions to create the target object + * as well. + * * Note that, user still may not be able to create the target due to no permissions * for any of its dependencies. But this is ok since it should be rare. * @@ -125,7 +194,18 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) * have visibility issues since propagated dependencies would be invisible to * the separate connection until we locally commit. */ - if (HasAnyDependencyInPropagatedObjects(target)) + List *createdObjectList = GetAllSupportedDependenciesForObject(target); + + /* consider target as well if we're requested to create it too */ + if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES) + { + ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress)); + *targetCopy = *target; + + createdObjectList = lappend(createdObjectList, targetCopy); + } + + if (HasAnyObjectInPropagatedObjects(createdObjectList)) { SendCommandListToRemoteNodesWithMetadata(ddlCommands); } @@ -148,7 +228,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) * that objects have been created on remote nodes before marking them * distributed, so MarkObjectDistributed wouldn't fail. */ - foreach_ptr(dependency, dependenciesWithCommands) + foreach_ptr(object, objectsWithCommands) { /* * pg_dist_object entries must be propagated with the super user, since @@ -158,7 +238,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) * Only dependent object's metadata should be propagated with super user. * Metadata of the table itself must be propagated with the current user. */ - MarkObjectDistributedViaSuperUser(dependency); + MarkObjectDistributedViaSuperUser(object); } } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 9a7bd9089..d8b8dfe50 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -1139,18 +1139,17 @@ ResetPropagatedObjects(void) /* - * HasAnyDependencyInPropagatedObjects decides if any dependency of given object is + * HasAnyObjectInPropagatedObjects decides if any of the objects in given list are * propagated in the current transaction. */ bool -HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress) +HasAnyObjectInPropagatedObjects(List *objectList) { - List *dependencyList = GetAllSupportedDependenciesForObject(objectAddress); - ObjectAddress *dependency = NULL; - foreach_ptr(dependency, dependencyList) + ObjectAddress *object = NULL; + foreach_ptr(object, objectList) { /* first search in root transaction */ - if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, dependency)) + if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, object)) { return true; } @@ -1163,7 +1162,7 @@ HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress) SubXactContext *state = NULL; foreach_ptr(state, activeSubXactContexts) { - if (DependencyInPropagatedObjectsHash(state->propagatedObjects, dependency)) + if (DependencyInPropagatedObjectsHash(state->propagatedObjects, object)) { return true; } diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 7e50a6af6..764ebd4ca 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -385,6 +385,7 @@ extern void EnsureUndistributeTenantTableSafe(Oid relationId, const char *operat extern TableConversionReturn * UndistributeTable(TableConversionParameters *params); extern void UndistributeTables(List *relationIdList); +extern void EnsureObjectExistOnAllNodes(const ObjectAddress *target); extern void EnsureAllObjectDependenciesExistOnAllNodes(const List *targets); extern DeferredErrorMessage * DeferErrorIfCircularDependencyExists(const ObjectAddress * diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index ca4e632a9..9b41b060e 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -164,7 +164,7 @@ extern bool MaybeExecutingUDF(void); extern void TrackPropagatedObject(const ObjectAddress *objectAddress); extern void TrackPropagatedTableAndSequences(Oid relationId); extern void ResetPropagatedObjects(void); -extern bool HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress); +extern bool HasAnyObjectInPropagatedObjects(List *objectList); /* initialization function(s) */ extern void InitializeTransactionManagement(void);