/*------------------------------------------------------------------------- * * dependencies.c * Commands to create dependencies of an object on all workers. * * Copyright (c) Citus Data, Inc. * *------------------------------------------------------------------------- */ #include "postgres.h" #include "miscadmin.h" #include "catalog/dependency.h" #include "catalog/objectaddress.h" #include "commands/extension.h" #include "storage/lmgr.h" #include "utils/lsyscache.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/metadata/dependency.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress); static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress); static int ObjectAddressComparator(const void *a, const void *b); static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency); static bool ShouldPropagateObject(const ObjectAddress *address); static char * DropTableIfExistsCommand(Oid relationId); /* * EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes * sure these are available on all workers. If not available they will be created on the * workers via a separate session that will be committed directly so that the objects are * visible to potentially multiple sessions creating the shards. * * Note; only the actual objects are created via a separate session, the records to * pg_dist_object are created in this session. As a side effect the objects could be * created on the workers without a catalog entry. Updates to the objects on the coordinator * are not propagated to the workers until the record is visible on the coordinator. * * This is solved by creating the dependencies in an idempotent manner, either via * postgres native CREATE IF NOT EXISTS, or citus helper functions. */ static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) { List *dependenciesWithCommands = NIL; List *ddlCommands = NULL; /* * If there is any unsupported dependency or circular dependency exists, Citus can * not ensure dependencies will exist on all nodes. */ EnsureDependenciesCanBeDistributed(target); /* collect all dependencies in creation order and get their ddl commands */ List *dependencies = GetDependenciesForObject(target); ObjectAddress *dependency = NULL; foreach_ptr(dependency, dependencies) { List *dependencyCommands = GetDependencyCreateDDLCommands(dependency); ddlCommands = list_concat(ddlCommands, dependencyCommands); /* create a new list with dependencies that actually created commands */ if (list_length(dependencyCommands) > 0) { dependenciesWithCommands = lappend(dependenciesWithCommands, dependency); } } if (list_length(ddlCommands) <= 0) { /* no ddl commands to be executed */ return; } /* since we are executing ddl commands lets disable propagation, primarily for mx */ ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands); /* * Make sure that no new nodes are added after this point until the end of the * transaction by taking a RowShareLock on pg_dist_node, which conflicts with the * ExclusiveLock taken by citus_add_node. * This guarantees that all active nodes will have the object, because they will * either get it now, or get it in citus_add_node after this transaction finishes and * the pg_dist_object record becomes visible. */ List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(RowShareLock); /* * Lock dependent objects explicitly to make sure same DDL command won't be sent * multiple times from parallel sessions. * * Sort dependencies that will be created on workers to not to have any deadlock * issue if different sessions are creating different objects. */ List *addressSortedDependencies = SortList(dependenciesWithCommands, ObjectAddressComparator); foreach_ptr(dependency, addressSortedDependencies) { LockDatabaseObject(dependency->classId, dependency->objectId, dependency->objectSubId, ExclusiveLock); } /* * We need to propagate dependencies via the current user's metadata connection if * any dependency for the target is created in the current transaction. Our assumption * is that if we rely on a dependency created in the current transaction, then the * current user, most probably, has permissions to create the target object as well. * Note that, user still may not be able to create the target due to no permissions * for any of its dependencies. But this is ok since it should be rare. * * If we opted to use a separate superuser connection for the target, then we would * have visibility issues since propagated dependencies would be invisible to * the separate connection until we locally commit. */ if (HasAnyDependencyInPropagatedObjects(target)) { SendCommandListToWorkersWithMetadata(ddlCommands); } else { WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { const char *nodeName = workerNode->workerName; uint32 nodePort = workerNode->workerPort; SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommands); } } /* * We do this after creating the objects on the workers, we make sure * that objects have been created on worker nodes before marking them * distributed, so MarkObjectDistributed wouldn't fail. */ foreach_ptr(dependency, dependenciesWithCommands) { /* * pg_dist_object entries must be propagated with the super user, since * the owner of the target object may not own dependencies but we must * propagate as we send objects itself with the superuser. * * Only dependent object's metadata should be propagated with super user. * Metadata of the table itself must be propagated with the current user. */ MarkObjectDistributedViaSuperUser(dependency); } } /* * EnsureAllObjectDependenciesExistOnAllNodes iteratively calls EnsureDependenciesExistOnAllNodes * for given targets. */ void EnsureAllObjectDependenciesExistOnAllNodes(const List *targets) { ObjectAddress *target = NULL; foreach_ptr(target, targets) { EnsureDependenciesExistOnAllNodes(target); } } /* * EnsureDependenciesCanBeDistributed ensures all dependencies of the given object * can be distributed. */ static void EnsureDependenciesCanBeDistributed(const ObjectAddress *objectAddress) { /* If the object circularcly depends to itself, Citus can not handle it */ ErrorIfCircularDependencyExists(objectAddress); /* If the object has any unsupported dependency, error out */ DeferredErrorMessage *depError = DeferErrorIfAnyObjectHasUnsupportedDependency( list_make1((ObjectAddress *) objectAddress)); if (depError != NULL) { /* override error detail as it is not applicable here*/ depError->detail = NULL; RaiseDeferredError(depError, ERROR); } } /* * ErrorIfCircularDependencyExists is a wrapper around * DeferErrorIfCircularDependencyExists(), and throws error * if circular dependency exists. */ static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress) { DeferredErrorMessage *depError = DeferErrorIfCircularDependencyExists(objectAddress); if (depError != NULL) { RaiseDeferredError(depError, ERROR); } } /* * DeferErrorIfCircularDependencyExists checks whether given object has * circular dependency with itself. If so, returns a deferred error. */ DeferredErrorMessage * DeferErrorIfCircularDependencyExists(const ObjectAddress *objectAddress) { List *dependencies = GetAllDependenciesForObject(objectAddress); ObjectAddress *dependency = NULL; foreach_ptr(dependency, dependencies) { if (dependency->classId == objectAddress->classId && dependency->objectId == objectAddress->objectId && dependency->objectSubId == objectAddress->objectSubId) { char *objectDescription = getObjectDescription(objectAddress, false); StringInfo detailInfo = makeStringInfo(); appendStringInfo(detailInfo, "\"%s\" circularly depends itself, resolve " "circular dependency first", objectDescription); return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "Citus can not handle circular dependencies " "between distributed objects", detailInfo->data, NULL); } } return NULL; } /* * Copied from PG object_address_comparator function to compare ObjectAddresses. */ static int ObjectAddressComparator(const void *a, const void *b) { const ObjectAddress *obja = (const ObjectAddress *) a; const ObjectAddress *objb = (const ObjectAddress *) b; /* * Primary sort key is OID descending. */ if (obja->objectId > objb->objectId) { return -1; } if (obja->objectId < objb->objectId) { return 1; } /* * Next sort on catalog ID, in case identical OIDs appear in different * catalogs. Sort direction is pretty arbitrary here. */ if (obja->classId < objb->classId) { return -1; } if (obja->classId > objb->classId) { return 1; } /* * Last, sort on object subId. */ if ((unsigned int) obja->objectSubId < (unsigned int) objb->objectSubId) { return -1; } if ((unsigned int) obja->objectSubId > (unsigned int) objb->objectSubId) { return 1; } return 0; } /* * GetDistributableDependenciesForObject finds all the dependencies that Citus * can distribute and returns those dependencies regardless of their existency * on nodes. */ List * GetDistributableDependenciesForObject(const ObjectAddress *target) { /* local variables to work with dependencies */ List *distributableDependencies = NIL; /* collect all dependencies in creation order */ List *dependencies = GetDependenciesForObject(target); /* filter the ones that can be distributed */ ObjectAddress *dependency = NULL; foreach_ptr(dependency, dependencies) { /* * TODO: maybe we can optimize the logic applied in below line. Actually we * do not need to create ddl commands as we are not ensuring their existence * in nodes, but we utilize logic it follows to choose the objects that could * be distributed */ List *dependencyCommands = GetDependencyCreateDDLCommands(dependency); /* create a new list with dependencies that actually created commands */ if (list_length(dependencyCommands) > 0) { distributableDependencies = lappend(distributableDependencies, dependency); } } return distributableDependencies; } /* * DropTableIfExistsCommand returns command to drop given table if exists. */ static char * DropTableIfExistsCommand(Oid relationId) { char *qualifiedRelationName = generate_qualified_relation_name(relationId); StringInfo dropTableCommand = makeStringInfo(); appendStringInfo(dropTableCommand, "DROP TABLE IF EXISTS %s CASCADE", qualifiedRelationName); return dropTableCommand->data; } /* * GetDependencyCreateDDLCommands returns a list (potentially empty or NIL) of ddl * commands to execute on a worker to create the object. */ static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency) { switch (getObjectClass(dependency)) { case OCLASS_CLASS: { char relKind = get_rel_relkind(dependency->objectId); /* * types have an intermediate dependency on a relation (aka class), so we do * support classes when the relkind is composite */ if (relKind == RELKIND_COMPOSITE_TYPE) { return NIL; } /* * Indices are created separately, however, they do show up in the dependency * list for a table since they will have potentially their own dependencies. * The commands will be added to both shards and metadata tables via the table * creation commands. */ if (relKind == RELKIND_INDEX || relKind == RELKIND_PARTITIONED_INDEX) { return NIL; } if (relKind == RELKIND_RELATION || relKind == RELKIND_PARTITIONED_TABLE || relKind == RELKIND_FOREIGN_TABLE) { Oid relationId = dependency->objectId; List *commandList = NIL; if (IsCitusTable(relationId)) { bool creatingShellTableOnRemoteNode = true; List *tableDDLCommands = GetFullTableCreationCommands(relationId, WORKER_NEXTVAL_SEQUENCE_DEFAULTS, INCLUDE_IDENTITY, creatingShellTableOnRemoteNode); TableDDLCommand *tableDDLCommand = NULL; foreach_ptr(tableDDLCommand, tableDDLCommands) { Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); commandList = lappend(commandList, GetTableDDLCommand( tableDDLCommand)); } /* * We need to drop table, if exists, first to make table creation * idempotent. Before dropping the table, we should also break * dependencies with sequences since `drop cascade table` would also * drop depended sequences. This is safe as we still record dependency * with the sequence during table creation. */ commandList = lcons(DropTableIfExistsCommand(relationId), commandList); commandList = lcons(WorkerDropSequenceDependencyCommand(relationId), commandList); } return commandList; } if (relKind == RELKIND_SEQUENCE) { char *sequenceOwnerName = TableOwner(dependency->objectId); return DDLCommandsForSequence(dependency->objectId, sequenceOwnerName); } if (relKind == RELKIND_VIEW) { char *createViewCommand = CreateViewDDLCommand(dependency->objectId); char *alterViewOwnerCommand = AlterViewOwnerCommand(dependency->objectId); return list_make2(createViewCommand, alterViewOwnerCommand); } /* if this relation is not supported, break to the error at the end */ break; } case OCLASS_COLLATION: { return CreateCollationDDLsIdempotent(dependency->objectId); } case OCLASS_CONSTRAINT: { /* * Constraints can only be reached by domains, they resolve functions. * Constraints themself are recreated by the domain recreation. */ return NIL; } case OCLASS_DATABASE: { List *databaseDDLCommands = NIL; /* only propagate the ownership of the database when the feature is on */ if (EnableAlterDatabaseOwner) { List *ownerDDLCommands = DatabaseOwnerDDLCommands(dependency); databaseDDLCommands = list_concat(databaseDDLCommands, ownerDDLCommands); } return databaseDDLCommands; } case OCLASS_PROC: { List *DDLCommands = CreateFunctionDDLCommandsIdempotent(dependency); List *grantDDLCommands = GrantOnFunctionDDLCommands(dependency->objectId); DDLCommands = list_concat(DDLCommands, grantDDLCommands); return DDLCommands; } case OCLASS_PUBLICATION: { return CreatePublicationDDLCommandsIdempotent(dependency); } case OCLASS_ROLE: { return GenerateCreateOrAlterRoleCommand(dependency->objectId); } case OCLASS_SCHEMA: { char *schemaDDLCommand = CreateSchemaDDLCommand(dependency->objectId); List *DDLCommands = list_make1(schemaDDLCommand); List *grantDDLCommands = GrantOnSchemaDDLCommands(dependency->objectId); DDLCommands = list_concat(DDLCommands, grantDDLCommands); return DDLCommands; } case OCLASS_TSCONFIG: { return CreateTextSearchConfigDDLCommandsIdempotent(dependency); } case OCLASS_TSDICT: { return CreateTextSearchDictDDLCommandsIdempotent(dependency); } case OCLASS_TYPE: { return CreateTypeDDLCommandsIdempotent(dependency); } case OCLASS_EXTENSION: { return CreateExtensionDDLCommand(dependency); } case OCLASS_FOREIGN_SERVER: { Oid serverId = dependency->objectId; List *DDLCommands = GetForeignServerCreateDDLCommand(serverId); List *grantDDLCommands = GrantOnForeignServerDDLCommands(serverId); DDLCommands = list_concat(DDLCommands, grantDDLCommands); return DDLCommands; } default: { break; } } /* * make sure it fails hard when in debug mode, leave a hint for the user if this ever * happens in production */ Assert(false); ereport(ERROR, (errmsg("unsupported object %s for distribution by citus", getObjectTypeDescription(dependency, /* missingOk: */ false)), errdetail( "citus tries to recreate an unsupported object on its workers"), errhint("please report a bug as this should not be happening"))); } /* * GetAllDependencyCreateDDLCommands iteratively calls GetDependencyCreateDDLCommands * for given dependencies. */ List * GetAllDependencyCreateDDLCommands(const List *dependencies) { List *commands = NIL; ObjectAddress *dependency = NULL; foreach_ptr(dependency, dependencies) { commands = list_concat(commands, GetDependencyCreateDDLCommands(dependency)); } return commands; } /* * ShouldPropagate determines if we should be propagating anything */ bool ShouldPropagate(void) { if (creating_extension) { /* * extensions should be created separately on the workers, types cascading from an * extension should therefore not be propagated. */ return false; } if (!EnableMetadataSync) { /* * we are configured to disable object propagation, should not propagate anything */ return false; } return true; } /* * ShouldPropagateCreateInCoordinatedTransction returns based the current state of the * session and policies if Citus needs to propagate the creation of new objects. * * Creation of objects on other nodes could be postponed till the object is actually used * in a sharded object (eg. distributed table or index on a distributed table). In certain * use cases the opportunity for parallelism in a transaction block is preferred. When * configured like that the creation of an object might be postponed and backfilled till * the object is actually used. */ bool ShouldPropagateCreateInCoordinatedTransction() { if (!IsMultiStatementTransaction()) { /* * If we are in a single statement transaction we will always propagate the * creation of objects. There are no downsides in regard to performance or * transactional limitations. These only arise with transaction blocks consisting * of multiple statements. */ return true; } if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { /* * If we are in a transaction that is already switched to sequential, either by * the user, or automatically by an other command, we will always propagate the * creation of new objects to the workers. * * This guarantees no strange anomalies when the transaction aborts or on * visibility of the newly created object. */ return true; } switch (CreateObjectPropagationMode) { case CREATE_OBJECT_PROPAGATION_DEFERRED: { /* * We prefer parallelism at this point. Since we did not already return while * checking for sequential mode we are still in parallel mode. We don't want * to switch that now, thus not propagating the creation. */ return false; } case CREATE_OBJECT_PROPAGATION_AUTOMATIC: { /* * When we run in optimistic mode we want to switch to sequential mode, only * if this would _not_ give an error to the user. Meaning, we either are * already in sequential mode (checked earlier), or there has been no parallel * execution in the current transaction block. * * If switching to sequential would throw an error we would stay in parallel * mode while creating new objects. We will rely on Citus' mechanism to ensure * the existence if the object would be used in the same transaction. */ if (ParallelQueryExecutedInTransaction()) { return false; } return true; } case CREATE_OBJECT_PROPAGATION_IMMEDIATE: { return true; } default: { elog(ERROR, "unsupported ddl propagation mode"); } } } /* * ShouldPropagateObject determines if we should be propagating DDLs based * on their object address. */ static bool ShouldPropagateObject(const ObjectAddress *address) { if (!ShouldPropagate()) { return false; } if (!IsAnyObjectDistributed(list_make1((ObjectAddress *) address))) { /* do not propagate for non-distributed types */ return false; } return true; } /* * ShouldPropagateAnyObject determines if we should be propagating DDLs based * on their object addresses. */ bool ShouldPropagateAnyObject(List *addresses) { ObjectAddress *address = NULL; foreach_ptr(address, addresses) { if (ShouldPropagateObject(address)) { return true; } } return false; } /* * FilterObjectAddressListByPredicate takes a list of ObjectAddress *'s and returns a list * only containing the ObjectAddress *'s for which the predicate returned true. */ List * FilterObjectAddressListByPredicate(List *objectAddressList, AddressPredicate predicate) { List *result = NIL; ObjectAddress *address = NULL; foreach_ptr(address, objectAddressList) { if (predicate(address)) { result = lappend(result, address); } } return result; }