Refactor EnsureDependenciesExistOnAllNodes to introduce EnsureObjectExistOnAllNodes

reassign_owned_prop_onur
Onur Tirtir 2023-11-23 16:43:08 +03:00
parent 557dd71133
commit 480175d49e
4 changed files with 113 additions and 33 deletions

View File

@ -29,20 +29,65 @@
#include "storage/lmgr.h"
#include "utils/lsyscache.h"
typedef enum RequiredObjectSet
{
REQUIRE_ONLY_DEPENDENCIES = 1,
REQUIRE_OBJECT_AND_DEPENDENCIES = 2,
} RequiredObjectSet;
static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress);
static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress);
static int ObjectAddressComparator(const void *a, const void *b);
static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
static void EnsureRequiredObjectExistOnAllNodes(const ObjectAddress *target,
RequiredObjectSet requiredObjectSet);
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
static bool ShouldPropagateObject(const ObjectAddress *address);
static char * DropTableIfExistsCommand(Oid relationId);
/*
* EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes
* sure these are available on all nodes. If not available they will be created on the
* nodes via a separate session that will be committed directly so that the objects are
* visible to potentially multiple sessions creating the shards.
* EnsureObjectExistOnAllNodes is a wrapper around
* EnsureRequiredObjectExistOnAllNodes to ensure the "object itself" (together
* with its dependencies) is available on all nodes.
*
* See EnsureRequiredObjectExistOnAllNodes to learn more about how this
* function deals with an object created within the same transaction.
*/
void
EnsureObjectExistOnAllNodes(const ObjectAddress *target)
{
EnsureRequiredObjectExistOnAllNodes(target, REQUIRE_OBJECT_AND_DEPENDENCIES);
}
/*
* EnsureDependenciesExistOnAllNodes is a wrapper around
* EnsureRequiredObjectExistOnAllNodes to ensure "all dependencies" of given
* object --but not the object itself-- are available on all nodes.
*
* See EnsureRequiredObjectExistOnAllNodes to learn more about how this
* function deals with an object created within the same transaction.
*/
static void
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
{
EnsureRequiredObjectExistOnAllNodes(target, REQUIRE_ONLY_DEPENDENCIES);
}
/*
* EnsureRequiredObjectExistOnAllNodes finds all the dependencies that we support and makes
* sure these are available on all nodes if required object set is REQUIRE_ONLY_DEPENDENCIES.
* Otherwise, i.e., if required object set is REQUIRE_OBJECT_AND_DEPENDENCIES, then this
* function creates the object itself on all nodes too. This function ensures that each
* of the dependencies are supported by Citus but doesn't check the same for the target
* object itself (when REQUIRE_OBJECT_AND_DEPENDENCIES) is provided because we assume that
* callers don't call this function for an unsupported function at all.
*
* If not available, they will be created on the nodes via a separate session that will be
* committed directly so that the objects are visible to potentially multiple sessions creating
* the shards.
*
* Note; only the actual objects are created via a separate session, the records to
* pg_dist_object are created in this session. As a side effect the objects could be
@ -53,29 +98,51 @@ static char * DropTableIfExistsCommand(Oid relationId);
* postgres native CREATE IF NOT EXISTS, or citus helper functions.
*/
static void
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
EnsureRequiredObjectExistOnAllNodes(const ObjectAddress *target,
RequiredObjectSet requiredObjectSet)
{
List *dependenciesWithCommands = NIL;
Assert(requiredObjectSet == REQUIRE_ONLY_DEPENDENCIES ||
requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES);
List *objectsWithCommands = NIL;
List *ddlCommands = NULL;
/*
* If there is any unsupported dependency or circular dependency exists, Citus can
* not ensure dependencies will exist on all nodes.
*
* Note that we don't check whether "target" is distributable (in case
* REQUIRE_OBJECT_AND_DEPENDENCIES is provided) because we expect callers
* to not even call this function if Citus doesn't know how to propagate
* "target" object itself.
*/
EnsureDependenciesCanBeDistributed(target);
/* collect all dependencies in creation order and get their ddl commands */
List *dependencies = GetDependenciesForObject(target);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies)
List *objectsToBeCreated = GetDependenciesForObject(target);
/*
* Append the target object to make sure that it's created after its
* dependencies are created, if requested.
*/
if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES)
{
List *dependencyCommands = GetDependencyCreateDDLCommands(dependency);
ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress));
*targetCopy = *target;
objectsToBeCreated = lappend(objectsToBeCreated, targetCopy);
}
ObjectAddress *object = NULL;
foreach_ptr(object, objectsToBeCreated)
{
List *dependencyCommands = GetDependencyCreateDDLCommands(object);
ddlCommands = list_concat(ddlCommands, dependencyCommands);
/* create a new list with dependencies that actually created commands */
/* create a new list with objects that actually created commands */
if (list_length(dependencyCommands) > 0)
{
dependenciesWithCommands = lappend(dependenciesWithCommands, dependency);
objectsWithCommands = lappend(objectsWithCommands, object);
}
}
if (list_length(ddlCommands) <= 0)
@ -98,26 +165,28 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
List *remoteNodeList = ActivePrimaryRemoteNodeList(RowShareLock);
/*
* Lock dependent objects explicitly to make sure same DDL command won't be sent
* Lock objects to be created explicitly to make sure same DDL command won't be sent
* multiple times from parallel sessions.
*
* Sort dependencies that will be created on workers to not to have any deadlock
* Sort the objects that will be created on workers to not to have any deadlock
* issue if different sessions are creating different objects.
*/
List *addressSortedDependencies = SortList(dependenciesWithCommands,
List *addressSortedDependencies = SortList(objectsWithCommands,
ObjectAddressComparator);
foreach_ptr(dependency, addressSortedDependencies)
foreach_ptr(object, addressSortedDependencies)
{
LockDatabaseObject(dependency->classId, dependency->objectId,
dependency->objectSubId, ExclusiveLock);
LockDatabaseObject(object->classId, object->objectId,
object->objectSubId, ExclusiveLock);
}
/*
* We need to propagate dependencies via the current user's metadata connection if
* any dependency for the target is created in the current transaction. Our assumption
* is that if we rely on a dependency created in the current transaction, then the
* current user, most probably, has permissions to create the target object as well.
* We need to propagate objects via the current user's metadata connection if
* any of the objects that we're interested in are created in the current transaction.
* Our assumption is that if we rely on an object created in the current transaction,
* then the current user, most probably, has permissions to create the target object
* as well.
*
* Note that, user still may not be able to create the target due to no permissions
* for any of its dependencies. But this is ok since it should be rare.
*
@ -125,7 +194,18 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* have visibility issues since propagated dependencies would be invisible to
* the separate connection until we locally commit.
*/
if (HasAnyDependencyInPropagatedObjects(target))
List *createdObjectList = GetAllSupportedDependenciesForObject(target);
/* consider target as well if we're requested to create it too */
if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES)
{
ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress));
*targetCopy = *target;
createdObjectList = lappend(createdObjectList, targetCopy);
}
if (HasAnyObjectInPropagatedObjects(createdObjectList))
{
SendCommandListToRemoteNodesWithMetadata(ddlCommands);
}
@ -148,7 +228,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* that objects have been created on remote nodes before marking them
* distributed, so MarkObjectDistributed wouldn't fail.
*/
foreach_ptr(dependency, dependenciesWithCommands)
foreach_ptr(object, objectsWithCommands)
{
/*
* pg_dist_object entries must be propagated with the super user, since
@ -158,7 +238,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* Only dependent object's metadata should be propagated with super user.
* Metadata of the table itself must be propagated with the current user.
*/
MarkObjectDistributedViaSuperUser(dependency);
MarkObjectDistributedViaSuperUser(object);
}
}

View File

@ -1139,18 +1139,17 @@ ResetPropagatedObjects(void)
/*
* HasAnyDependencyInPropagatedObjects decides if any dependency of given object is
* HasAnyObjectInPropagatedObjects decides if any of the objects in given list are
* propagated in the current transaction.
*/
bool
HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress)
HasAnyObjectInPropagatedObjects(List *objectList)
{
List *dependencyList = GetAllSupportedDependenciesForObject(objectAddress);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencyList)
ObjectAddress *object = NULL;
foreach_ptr(object, objectList)
{
/* first search in root transaction */
if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, dependency))
if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, object))
{
return true;
}
@ -1163,7 +1162,7 @@ HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress)
SubXactContext *state = NULL;
foreach_ptr(state, activeSubXactContexts)
{
if (DependencyInPropagatedObjectsHash(state->propagatedObjects, dependency))
if (DependencyInPropagatedObjectsHash(state->propagatedObjects, object))
{
return true;
}

View File

@ -385,6 +385,7 @@ extern void EnsureUndistributeTenantTableSafe(Oid relationId, const char *operat
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
extern void UndistributeTables(List *relationIdList);
extern void EnsureObjectExistOnAllNodes(const ObjectAddress *target);
extern void EnsureAllObjectDependenciesExistOnAllNodes(const List *targets);
extern DeferredErrorMessage * DeferErrorIfCircularDependencyExists(const
ObjectAddress *

View File

@ -164,7 +164,7 @@ extern bool MaybeExecutingUDF(void);
extern void TrackPropagatedObject(const ObjectAddress *objectAddress);
extern void TrackPropagatedTableAndSequences(Oid relationId);
extern void ResetPropagatedObjects(void);
extern bool HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress);
extern bool HasAnyObjectInPropagatedObjects(List *objectList);
/* initialization function(s) */
extern void InitializeTransactionManagement(void);