Test fixes for other tests

pull/7384/head
gurkanindibay 2023-12-17 07:15:15 +03:00
parent 0c8095e49d
commit 13b03dc73d
5 changed files with 34 additions and 120 deletions

View File

@ -31,65 +31,20 @@
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
typedef enum RequiredObjectSet
{
REQUIRE_ONLY_DEPENDENCIES = 1,
REQUIRE_OBJECT_AND_DEPENDENCIES = 2,
} RequiredObjectSet;
static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress);
static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress);
static int ObjectAddressComparator(const void *a, const void *b);
static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
static void EnsureRequiredObjectExistOnAllNodes(const ObjectAddress *target,
RequiredObjectSet requiredObjectSet);
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
static bool ShouldPropagateObject(const ObjectAddress *address);
static char * DropTableIfExistsCommand(Oid relationId);
/*
* EnsureObjectAndDependenciesExistOnAllNodes is a wrapper around
* EnsureRequiredObjectExistOnAllNodes to ensure the "object itself" (together
* with its dependencies) is available on all nodes.
*
* See EnsureRequiredObjectExistOnAllNodes to learn more about how this
* function deals with an object created within the same transaction.
*/
void
EnsureObjectAndDependenciesExistOnAllNodes(const ObjectAddress *target)
{
EnsureRequiredObjectExistOnAllNodes(target, REQUIRE_OBJECT_AND_DEPENDENCIES);
}
/*
* EnsureDependenciesExistOnAllNodes is a wrapper around
* EnsureRequiredObjectExistOnAllNodes to ensure "all dependencies" of given
* object --but not the object itself-- are available on all nodes.
*
* See EnsureRequiredObjectExistOnAllNodes to learn more about how this
* function deals with an object created within the same transaction.
*/
static void
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
{
EnsureRequiredObjectExistOnAllNodes(target, REQUIRE_ONLY_DEPENDENCIES);
}
/*
* EnsureRequiredObjectExistOnAllNodes finds all the dependencies that we support and makes
* sure these are available on all nodes if required object set is REQUIRE_ONLY_DEPENDENCIES.
* Otherwise, i.e., if required object set is REQUIRE_OBJECT_AND_DEPENDENCIES, then this
* function creates the object itself on all nodes too. This function ensures that each
* of the dependencies are supported by Citus but doesn't check the same for the target
* object itself (when REQUIRE_OBJECT_AND_DEPENDENCIES) is provided because we assume that
* callers don't call this function for an unsupported function at all.
*
* If not available, they will be created on the nodes via a separate session that will be
* committed directly so that the objects are visible to potentially multiple sessions creating
* the shards.
* EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes
* sure these are available on all nodes. If not available they will be created on the
* nodes via a separate session that will be committed directly so that the objects are
* visible to potentially multiple sessions creating the shards.
*
* Note; only the actual objects are created via a separate session, the records to
* pg_dist_object are created in this session. As a side effect the objects could be
@ -100,57 +55,29 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* postgres native CREATE IF NOT EXISTS, or citus helper functions.
*/
static void
EnsureRequiredObjectExistOnAllNodes(const ObjectAddress *target,
RequiredObjectSet requiredObjectSet)
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
{
if (!IsAnyObjectDistributed(list_make1((ObjectAddress *) target)))
{
/* do not propagate for non-distributed types */
return;
}
Assert(requiredObjectSet == REQUIRE_ONLY_DEPENDENCIES ||
requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES);
List *objectsWithCommands = NIL;
List *dependenciesWithCommands = NIL;
List *ddlCommands = NULL;
/*
* If there is any unsupported dependency or circular dependency exists, Citus can
* not ensure dependencies will exist on all nodes.
*
* Note that we don't check whether "target" is distributable (in case
* REQUIRE_OBJECT_AND_DEPENDENCIES is provided) because we expect callers
* to not even call this function if Citus doesn't know how to propagate
* "target" object itself.
*/
EnsureDependenciesCanBeDistributed(target);
/* collect all dependencies in creation order and get their ddl commands */
List *objectsToBeCreated = GetDependenciesForObject(target);
/*
* Append the target object to make sure that it's created after its
* dependencies are created, if requested.
*/
if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES)
List *dependencies = GetDependenciesForObject(target);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies)
{
ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress));
*targetCopy = *target;
objectsToBeCreated = lappend(objectsToBeCreated, targetCopy);
}
ObjectAddress *object = NULL;
foreach_ptr(object, objectsToBeCreated)
{
List *dependencyCommands = GetDependencyCreateDDLCommands(object);
List *dependencyCommands = GetDependencyCreateDDLCommands(dependency);
ddlCommands = list_concat(ddlCommands, dependencyCommands);
/* create a new list with objects that actually created commands */
/* create a new list with dependencies that actually created commands */
if (list_length(dependencyCommands) > 0)
{
objectsWithCommands = lappend(objectsWithCommands, object);
dependenciesWithCommands = lappend(dependenciesWithCommands, dependency);
}
}
if (list_length(ddlCommands) <= 0)
@ -173,28 +100,26 @@ EnsureRequiredObjectExistOnAllNodes(const ObjectAddress *target,
List *remoteNodeList = ActivePrimaryRemoteNodeList(RowShareLock);
/*
* Lock objects to be created explicitly to make sure same DDL command won't be sent
* Lock dependent objects explicitly to make sure same DDL command won't be sent
* multiple times from parallel sessions.
*
* Sort the objects that will be created on workers to not to have any deadlock
* Sort dependencies that will be created on workers to not to have any deadlock
* issue if different sessions are creating different objects.
*/
List *addressSortedDependencies = SortList(objectsWithCommands,
List *addressSortedDependencies = SortList(dependenciesWithCommands,
ObjectAddressComparator);
foreach_ptr(object, addressSortedDependencies)
foreach_ptr(dependency, addressSortedDependencies)
{
LockDatabaseObject(object->classId, object->objectId,
object->objectSubId, ExclusiveLock);
LockDatabaseObject(dependency->classId, dependency->objectId,
dependency->objectSubId, ExclusiveLock);
}
/*
* We need to propagate objects via the current user's metadata connection if
* any of the objects that we're interested in are created in the current transaction.
* Our assumption is that if we rely on an object created in the current transaction,
* then the current user, most probably, has permissions to create the target object
* as well.
*
* We need to propagate dependencies via the current user's metadata connection if
* any dependency for the target is created in the current transaction. Our assumption
* is that if we rely on a dependency created in the current transaction, then the
* current user, most probably, has permissions to create the target object as well.
* Note that, user still may not be able to create the target due to no permissions
* for any of its dependencies. But this is ok since it should be rare.
*
@ -202,18 +127,7 @@ EnsureRequiredObjectExistOnAllNodes(const ObjectAddress *target,
* have visibility issues since propagated dependencies would be invisible to
* the separate connection until we locally commit.
*/
List *createdObjectList = GetAllSupportedDependenciesForObject(target);
/* consider target as well if we're requested to create it too */
if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES)
{
ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress));
*targetCopy = *target;
createdObjectList = lappend(createdObjectList, targetCopy);
}
if (HasAnyObjectInPropagatedObjects(createdObjectList))
if (HasAnyDependencyInPropagatedObjects(target))
{
SendCommandListToRemoteNodesWithMetadata(ddlCommands);
}
@ -236,7 +150,7 @@ EnsureRequiredObjectExistOnAllNodes(const ObjectAddress *target,
* that objects have been created on remote nodes before marking them
* distributed, so MarkObjectDistributed wouldn't fail.
*/
foreach_ptr(object, objectsWithCommands)
foreach_ptr(dependency, dependenciesWithCommands)
{
/*
* pg_dist_object entries must be propagated with the super user, since
@ -246,7 +160,7 @@ EnsureRequiredObjectExistOnAllNodes(const ObjectAddress *target,
* Only dependent object's metadata should be propagated with super user.
* Metadata of the table itself must be propagated with the current user.
*/
MarkObjectDistributedViaSuperUser(object);
MarkObjectDistributedViaSuperUser(dependency);
}
}

View File

@ -149,7 +149,7 @@ PostprocessReassignOwnedStmt(Node *node, const char *queryString)
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
EnsureObjectAndDependenciesExistOnAllNodes(newRoleAddress);
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(newRoleAddress));
/* rollback GUCs to the state before this session */
AtEOXact_GUC(true, saveNestLevel);

View File

@ -1139,17 +1139,18 @@ ResetPropagatedObjects(void)
/*
* HasAnyObjectInPropagatedObjects decides if any of the objects in given list are
* HasAnyDependencyInPropagatedObjects decides if any dependency of given object is
* propagated in the current transaction.
*/
bool
HasAnyObjectInPropagatedObjects(List *objectList)
HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress)
{
ObjectAddress *object = NULL;
foreach_ptr(object, objectList)
List *dependencyList = GetAllSupportedDependenciesForObject(objectAddress);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencyList)
{
/* first search in root transaction */
if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, object))
if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, dependency))
{
return true;
}
@ -1162,7 +1163,7 @@ HasAnyObjectInPropagatedObjects(List *objectList)
SubXactContext *state = NULL;
foreach_ptr(state, activeSubXactContexts)
{
if (DependencyInPropagatedObjectsHash(state->propagatedObjects, object))
if (DependencyInPropagatedObjectsHash(state->propagatedObjects, dependency))
{
return true;
}

View File

@ -386,7 +386,6 @@ extern void EnsureUndistributeTenantTableSafe(Oid relationId, const char *operat
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
extern void UndistributeTables(List *relationIdList);
extern void EnsureObjectAndDependenciesExistOnAllNodes(const ObjectAddress *target);
extern void EnsureAllObjectDependenciesExistOnAllNodes(const List *targets);
extern DeferredErrorMessage * DeferErrorIfCircularDependencyExists(const
ObjectAddress *

View File

@ -163,7 +163,7 @@ extern bool MaybeExecutingUDF(void);
extern void TrackPropagatedObject(const ObjectAddress *objectAddress);
extern void TrackPropagatedTableAndSequences(Oid relationId);
extern void ResetPropagatedObjects(void);
extern bool HasAnyObjectInPropagatedObjects(List *objectList);
extern bool HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress);
/* initialization function(s) */
extern void InitializeTransactionManagement(void);