From 5e4c0e4bea4dd691bb2f3e04048c7de21d6e2ae9 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Wed, 18 May 2022 16:30:31 +0200 Subject: [PATCH] Merge pull request #5931 from citusdata/refactor/dedupe-object-propagation Refactor: reduce complexity and code duplication for Object Propagation --- src/backend/distributed/commands/aggregate.c | 89 --- src/backend/distributed/commands/collation.c | 350 --------- src/backend/distributed/commands/common.c | 274 +++++++ src/backend/distributed/commands/database.c | 69 -- .../distributed/commands/dependencies.c | 3 +- .../commands/distribute_object_ops.c | 226 +++--- src/backend/distributed/commands/domain.c | 374 ---------- .../distributed/commands/foreign_server.c | 286 +------ src/backend/distributed/commands/function.c | 252 ------- src/backend/distributed/commands/schema.c | 37 - .../distributed/commands/text_search.c | 705 ------------------ src/backend/distributed/commands/type.c | 548 -------------- .../distributed/commands/utility_hook.c | 14 + src/include/distributed/commands.h | 173 +---- src/include/distributed/metadata_utility.h | 1 + .../expected/propagate_foreign_servers.out | 5 +- .../regress/sql/propagate_foreign_servers.sql | 3 +- 17 files changed, 488 insertions(+), 2921 deletions(-) delete mode 100644 src/backend/distributed/commands/aggregate.c create mode 100644 src/backend/distributed/commands/common.c diff --git a/src/backend/distributed/commands/aggregate.c b/src/backend/distributed/commands/aggregate.c deleted file mode 100644 index 3e6de88e5..000000000 --- a/src/backend/distributed/commands/aggregate.c +++ /dev/null @@ -1,89 +0,0 @@ -/*------------------------------------------------------------------------- - * - * aggregate.c - * Commands for distributing AGGREGATE statements. - * - * Copyright (c) Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" - -#include "distributed/commands.h" -#include "distributed/commands/utility_hook.h" -#include "distributed/deparser.h" -#include "distributed/listutils.h" -#include "distributed/metadata/dependency.h" -#include "distributed/metadata_sync.h" -#include "distributed/metadata/distobject.h" -#include "distributed/multi_executor.h" -#include "nodes/parsenodes.h" -#include "utils/lsyscache.h" - - -/* - * PreprocessDefineAggregateStmt only qualifies the node with schema name. - * We will handle the rest in the Postprocess phase. - */ -List * -PreprocessDefineAggregateStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - QualifyTreeNode((Node *) node); - - return NIL; -} - - -/* - * PostprocessDefineAggregateStmt actually creates the plan we need to execute for - * aggregate propagation. - * This is the downside of using the locally created aggregate to get the sql statement. - * - * If the aggregate depends on any non-distributed relation, Citus can not distribute it. - * In order to not to prevent users from creating local aggregates on the coordinator, - * a WARNING message will be sent to the user about the case instead of erroring out. - * - * Besides creating the plan we also make sure all (new) dependencies of the aggregate - * are created on all nodes. - */ -List * -PostprocessDefineAggregateStmt(Node *node, const char *queryString) -{ - DefineStmt *stmt = castNode(DefineStmt, node); - - if (!ShouldPropagate()) - { - return NIL; - } - - if (!ShouldPropagateCreateInCoordinatedTransction()) - { - return NIL; - } - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - - EnsureCoordinator(); - - EnsureSequentialMode(OBJECT_AGGREGATE); - - /* If the aggregate has any unsupported dependency, create it locally */ - DeferredErrorMessage *depError = DeferErrorIfHasUnsupportedDependency(&address); - - if (depError != NULL) - { - RaiseDeferredError(depError, WARNING); - return NIL; - } - - EnsureDependenciesExistOnAllNodes(&address); - - List *commands = CreateFunctionDDLCommandsIdempotent(&address); - - commands = lcons(DISABLE_DDL_PROPAGATION, commands); - commands = lappend(commands, ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index c284404ce..8ba9dea9b 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -36,9 +36,6 @@ static char * CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollationName); -static List * FilterNameListForDistributedCollations(List *objects, bool missing_ok, - List **addresses); -static bool ShouldPropagateDefineCollationStmt(void); /* * GetCreateCollationDDLInternal returns a CREATE COLLATE sql string for the @@ -162,267 +159,6 @@ AlterCollationOwnerObjectAddress(Node *node, bool missing_ok) } -/* - * FilterNameListForDistributedCollations takes a list of objects to delete. - * This list is filtered against the collations that are distributed. - * - * The original list will not be touched, a new list will be created with only the objects - * in there. - * - * objectAddresses is replaced with a list of object addresses for the filtered objects. - */ -static List * -FilterNameListForDistributedCollations(List *objects, bool missing_ok, - List **objectAddresses) -{ - List *result = NIL; - - *objectAddresses = NIL; - - List *collName = NULL; - foreach_ptr(collName, objects) - { - Oid collOid = get_collation_oid(collName, true); - ObjectAddress collAddress = { 0 }; - - if (!OidIsValid(collOid)) - { - continue; - } - - ObjectAddressSet(collAddress, CollationRelationId, collOid); - if (IsObjectDistributed(&collAddress)) - { - ObjectAddress *address = palloc0(sizeof(ObjectAddress)); - *address = collAddress; - *objectAddresses = lappend(*objectAddresses, address); - result = lappend(result, collName); - } - } - return result; -} - - -List * -PreprocessDropCollationStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - DropStmt *stmt = castNode(DropStmt, node); - - /* - * We swap the list of objects to remove during deparse so we need a reference back to - * the old list to put back - */ - List *distributedTypeAddresses = NIL; - - if (!ShouldPropagate()) - { - return NIL; - } - - QualifyTreeNode((Node *) stmt); - - List *oldCollations = stmt->objects; - List *distributedCollations = - FilterNameListForDistributedCollations(oldCollations, stmt->missing_ok, - &distributedTypeAddresses); - if (list_length(distributedCollations) <= 0) - { - /* no distributed types to drop */ - return NIL; - } - - /* - * managing collations can only be done on the coordinator if ddl propagation is on. when - * it is off we will never get here. MX workers don't have a notion of distributed - * collations, so we block the call. - */ - EnsureCoordinator(); - - /* - * remove the entries for the distributed objects on dropping - */ - ObjectAddress *addressItem = NULL; - foreach_ptr(addressItem, distributedTypeAddresses) - { - UnmarkObjectDistributed(addressItem); - } - - /* - * temporary swap the lists of objects to delete with the distributed objects and - * deparse to an executable sql statement for the workers - */ - stmt->objects = distributedCollations; - char *dropStmtSql = DeparseTreeNode((Node *) stmt); - stmt->objects = oldCollations; - - EnsureSequentialMode(OBJECT_COLLATION); - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) dropStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterCollationOwnerStmt is called for change of ownership of collations - * before the ownership is changed on the local instance. - * - * If the type for which the owner is changed is distributed we execute the change on all - * the workers to keep the type in sync across the cluster. - */ -List * -PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - Assert(stmt->objectType == OBJECT_COLLATION); - - ObjectAddress collationAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&collationAddress)) - { - return NIL; - } - - EnsureCoordinator(); - - QualifyTreeNode((Node *) stmt); - char *sql = DeparseTreeNode((Node *) stmt); - - EnsureSequentialMode(OBJECT_COLLATION); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessAlterCollationOwnerStmt is invoked after the owner has been changed locally. - * Since changing the owner could result in new dependencies being found for this object - * we re-ensure all the dependencies for the collation do exist. - * - * This is solely to propagate the new owner (and all its dependencies) if it was not - * already distributed in the cluster. - */ -List * -PostprocessAlterCollationOwnerStmt(Node *node, const char *queryString) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - Assert(stmt->objectType == OBJECT_COLLATION); - - ObjectAddress collationAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&collationAddress)) - { - return NIL; - } - - EnsureDependenciesExistOnAllNodes(&collationAddress); - - return NIL; -} - - -/* - * PreprocessRenameCollationStmt is called when the user is renaming the collation. The invocation happens - * before the statement is applied locally. - * - * As the collation already exists we have access to the ObjectAddress for the collation, this is - * used to check if the collation is distributed. If the collation is distributed the rename is - * executed on all the workers to keep the collation in sync across the cluster. - */ -List * -PreprocessRenameCollationStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - RenameStmt *stmt = castNode(RenameStmt, node); - ObjectAddress collationAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&collationAddress)) - { - return NIL; - } - - EnsureCoordinator(); - - /* fully qualify */ - QualifyTreeNode((Node *) stmt); - - /* deparse sql*/ - char *renameStmtSql = DeparseTreeNode((Node *) stmt); - - EnsureSequentialMode(OBJECT_COLLATION); - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) renameStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterCollationSchemaStmt is executed before the statement is applied to the local - * postgres instance. - * - * In this stage we can prepare the commands that need to be run on all workers. - */ -List * -PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - Assert(stmt->objectType == OBJECT_COLLATION); - - ObjectAddress collationAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&collationAddress)) - { - return NIL; - } - - EnsureCoordinator(); - - QualifyTreeNode((Node *) stmt); - char *sql = DeparseTreeNode((Node *) stmt); - - EnsureSequentialMode(OBJECT_COLLATION); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessAlterCollationSchemaStmt is executed after the change has been applied locally, we - * can now use the new dependencies of the type to ensure all its dependencies exist on - * the workers before we apply the commands remotely. - */ -List * -PostprocessAlterCollationSchemaStmt(Node *node, const char *queryString) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - Assert(stmt->objectType == OBJECT_COLLATION); - - ObjectAddress collationAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&collationAddress)) - { - return NIL; - } - - /* dependencies have changed (schema) let's ensure they exist */ - EnsureDependenciesExistOnAllNodes(&collationAddress); - - return NIL; -} - - /* * RenameCollationStmtObjectAddress returns the ObjectAddress of the type that is the object * of the RenameStmt. Errors if missing_ok is false. @@ -544,89 +280,3 @@ DefineCollationStmtObjectAddress(Node *node, bool missing_ok) return address; } - - -/* - * PreprocessDefineCollationStmt executed before the collation has been - * created locally to ensure that if the collation create statement will - * be propagated, the node is a coordinator node - */ -List * -PreprocessDefineCollationStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - Assert(castNode(DefineStmt, node)->kind == OBJECT_COLLATION); - - if (!ShouldPropagateDefineCollationStmt()) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_COLLATION); - - return NIL; -} - - -/* - * PostprocessDefineCollationStmt executed after the collation has been - * created locally and before we create it on the worker nodes. - * As we now have access to ObjectAddress of the collation that is just - * created, we can mark it as distributed to make sure that its - * dependencies exist on all nodes. - */ -List * -PostprocessDefineCollationStmt(Node *node, const char *queryString) -{ - Assert(castNode(DefineStmt, node)->kind == OBJECT_COLLATION); - - if (!ShouldPropagateDefineCollationStmt()) - { - return NIL; - } - - ObjectAddress collationAddress = - DefineCollationStmtObjectAddress(node, false); - - DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency( - &collationAddress); - if (errMsg != NULL) - { - RaiseDeferredError(errMsg, WARNING); - return NIL; - } - - EnsureDependenciesExistOnAllNodes(&collationAddress); - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make1(DISABLE_DDL_PROPAGATION); - commands = list_concat(commands, CreateCollationDDLsIdempotent( - collationAddress.objectId)); - commands = lappend(commands, ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * ShouldPropagateDefineCollationStmt checks if collation define - * statement should be propagated. Don't propagate if: - * - metadata syncing if off - * - create statement should be propagated according the the ddl propagation policy - */ -static bool -ShouldPropagateDefineCollationStmt() -{ - if (!ShouldPropagate()) - { - return false; - } - - if (!ShouldPropagateCreateInCoordinatedTransction()) - { - return false; - } - - return true; -} diff --git a/src/backend/distributed/commands/common.c b/src/backend/distributed/commands/common.c new file mode 100644 index 000000000..1c6a71de3 --- /dev/null +++ b/src/backend/distributed/commands/common.c @@ -0,0 +1,274 @@ +/*------------------------------------------------------------------------- + * + * common.c + * + * Most of the object propagation code consists of mostly the same + * operations, varying slightly in parameters passed around. This + * file contains most of the reusable logic in object propagation. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "catalog/objectaddress.h" +#include "nodes/parsenodes.h" +#include "tcop/utility.h" + +#include "distributed/commands.h" +#include "distributed/commands/utility_hook.h" +#include "distributed/deparser.h" +#include "distributed/listutils.h" +#include "distributed/metadata_sync.h" +#include "distributed/metadata/dependency.h" +#include "distributed/metadata/distobject.h" +#include "distributed/multi_executor.h" +#include "distributed/worker_transaction.h" + + +/* + * PostprocessCreateDistributedObjectFromCatalogStmt is a common function that can be used + * for most objects during their creation phase. After the creation has happened locally + * this function creates idempotent statements to recreate the object addressed by the + * ObjectAddress of resolved from the creation statement. + * + * Since object already need to be able to create idempotent creation sql to support + * scaleout operations we can reuse this logic during the initial creation of the objects + * to reduce the complexity of implementation of new DDL commands. + */ +List * +PostprocessCreateDistributedObjectFromCatalogStmt(Node *stmt, const char *queryString) +{ + const DistributeObjectOps *ops = GetDistributeObjectOps(stmt); + Assert(ops != NULL); + + if (!ShouldPropagate()) + { + return NIL; + } + + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) + { + return NIL; + } + + if (ops->featureFlag && *ops->featureFlag == false) + { + /* not propagating when a configured feature flag is turned off by the user */ + return NIL; + } + + ObjectAddress address = GetObjectAddressFromParseTree(stmt, false); + + EnsureCoordinator(); + EnsureSequentialMode(ops->objectType); + + /* If the object has any unsupported dependency warn, and only create locally */ + DeferredErrorMessage *depError = DeferErrorIfHasUnsupportedDependency(&address); + if (depError != NULL) + { + RaiseDeferredError(depError, WARNING); + return NIL; + } + + EnsureDependenciesExistOnAllNodes(&address); + + List *commands = GetDependencyCreateDDLCommands(&address); + + commands = lcons(DISABLE_DDL_PROPAGATION, commands); + commands = lappend(commands, ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} + + +/* + * PreprocessAlterDistributedObjectStmt handles any updates to distributed objects by + * creating the fully qualified sql to apply to all workers after checking all + * predconditions that apply to propagating changes. + * + * Preconditions are (in order): + * - not in a CREATE/ALTER EXTENSION code block + * - citus.enable_metadata_sync is turned on + * - object being altered is distributed + * - any object specific feature flag is turned on when a feature flag is available + * + * Once we conclude to propagate the changes to the workers we make sure that the command + * has been executed on the coordinator and force any ongoing transaction to run in + * sequential mode. If any of these steps fail we raise an error to inform the user. + * + * Lastly we recreate a fully qualified version of the original sql and prepare the tasks + * to send these sql commands to the workers. These tasks include instructions to prevent + * recursion of propagation with Citus' MX functionality. + */ +List * +PreprocessAlterDistributedObjectStmt(Node *stmt, const char *queryString, + ProcessUtilityContext processUtilityContext) +{ + const DistributeObjectOps *ops = GetDistributeObjectOps(stmt); + Assert(ops != NULL); + + ObjectAddress address = GetObjectAddressFromParseTree(stmt, false); + if (!ShouldPropagateObject(&address)) + { + return NIL; + } + + if (ops->featureFlag && *ops->featureFlag == false) + { + /* not propagating when a configured feature flag is turned off by the user */ + return NIL; + } + + EnsureCoordinator(); + EnsureSequentialMode(ops->objectType); + + QualifyTreeNode(stmt); + const char *sql = DeparseTreeNode((Node *) stmt); + + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} + + +/* + * PostprocessAlterDistributedObjectStmt is the counter part of + * PreprocessAlterDistributedObjectStmt that should be executed after the object has been + * changed locally. + * + * We perform the same precondition checks as before to skip this operation if any of the + * failed during preprocessing. Since we already raised an error on other checks we don't + * have to repeat them here, as they will never fail during postprocessing. + * + * When objects get altered they can start depending on undistributed objects. Now that + * the objects has been changed locally we can find these new dependencies and make sure + * they get created on the workers before we send the command list to the workers. + */ +List * +PostprocessAlterDistributedObjectStmt(Node *stmt, const char *queryString) +{ + const DistributeObjectOps *ops = GetDistributeObjectOps(stmt); + Assert(ops != NULL); + + ObjectAddress address = GetObjectAddressFromParseTree(stmt, false); + if (!ShouldPropagateObject(&address)) + { + return NIL; + } + + if (ops->featureFlag && *ops->featureFlag == false) + { + /* not propagating when a configured feature flag is turned off by the user */ + return NIL; + } + + EnsureDependenciesExistOnAllNodes(&address); + + return NIL; +} + + +/* + * PreprocessDropDistributedObjectStmt is a general purpose hook that can propagate any + * DROP statement. + * + * DROP statements are one of the few DDL statements that can work on many different + * objects at once. Instead of resolving just one ObjectAddress and check it is + * distributed we will need to lookup many different object addresses. Only if an object + * was _not_ distributed we will need to remove it from the list of objects before we + * recreate the sql statement. + * + * Given that we actually _do_ need to drop them locally we can't simply remove them from + * the object list. Instead we create a new list where we only add distributed objects to. + * Before we recreate the sql statement we put this list on the drop statement, so that + * the SQL created will only contain the objects that are actually distributed in the + * cluster. After we have the SQL we restore the old list so that all objects get deleted + * locally. + * + * The reason we need to go through all this effort is taht we can't resolve the object + * addresses anymore after the objects have been removed locally. Meaning during the + * postprocessing we cannot understand which objects were distributed to begin with. + */ +List * +PreprocessDropDistributedObjectStmt(Node *node, const char *queryString, + ProcessUtilityContext processUtilityContext) +{ + DropStmt *stmt = castNode(DropStmt, node); + + /* + * We swap the list of objects to remove during deparse so we need a reference back to + * the old list to put back + */ + List *originalObjects = stmt->objects; + + if (!ShouldPropagate()) + { + return NIL; + } + + QualifyTreeNode(node); + + List *distributedObjects = NIL; + List *distributedObjectAddresses = NIL; + Node *object = NULL; + foreach_ptr(object, stmt->objects) + { + /* TODO understand if the lock should be sth else */ + Relation rel = NULL; /* not used, but required to pass to get_object_address */ + ObjectAddress address = get_object_address(stmt->removeType, object, &rel, + AccessShareLock, stmt->missing_ok); + if (IsObjectDistributed(&address)) + { + ObjectAddress *addressPtr = palloc0(sizeof(ObjectAddress)); + *addressPtr = address; + + distributedObjects = lappend(distributedObjects, object); + distributedObjectAddresses = lappend(distributedObjectAddresses, addressPtr); + } + } + + if (list_length(distributedObjects) <= 0) + { + /* no distributed objects to drop */ + return NIL; + } + + /* + * managing objects can only be done on the coordinator if ddl propagation is on. when + * it is off we will never get here. MX workers don't have a notion of distributed + * types, so we block the call. + */ + EnsureCoordinator(); + + /* + * remove the entries for the distributed objects on dropping + */ + ObjectAddress *address = NULL; + foreach_ptr(address, distributedObjectAddresses) + { + UnmarkObjectDistributed(address); + } + + /* + * temporary swap the lists of objects to delete with the distributed objects and + * deparse to an executable sql statement for the workers + */ + stmt->objects = distributedObjects; + char *dropStmtSql = DeparseTreeNode((Node *) stmt); + stmt->objects = originalObjects; + + EnsureSequentialMode(stmt->removeType); + + /* to prevent recursion with mx we disable ddl propagation */ + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + dropStmtSql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 59902b038..db1834d69 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -36,75 +36,6 @@ static Oid get_database_owner(Oid db_oid); bool EnableAlterDatabaseOwner = false; -/* - * PreprocessAlterDatabaseOwnerStmt is called during the utility hook before the alter - * command is applied locally on the coordinator. This will verify if the command needs to - * be propagated to the workers and if so prepares a list of ddl commands to execute. - */ -List * -PreprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - Assert(stmt->objectType == OBJECT_DATABASE); - - ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&typeAddress)) - { - return NIL; - } - - if (!EnableAlterDatabaseOwner) - { - /* don't propagate if GUC is turned off */ - return NIL; - } - - EnsureCoordinator(); - - QualifyTreeNode((Node *) stmt); - const char *sql = DeparseTreeNode((Node *) stmt); - - EnsureSequentialMode(OBJECT_DATABASE); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessAlterDatabaseOwnerStmt is called during the utility hook after the alter - * database command has been applied locally. - * - * Its main purpose is to propagate the newly formed dependencies onto the nodes before - * applying the change of owner of the databse. This ensures, for systems that have role - * management, that the roles will be created before applying the alter owner command. - */ -List * -PostprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - Assert(stmt->objectType == OBJECT_DATABASE); - - ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&typeAddress)) - { - return NIL; - } - - if (!EnableAlterDatabaseOwner) - { - /* don't propagate if GUC is turned off */ - return NIL; - } - - EnsureDependenciesExistOnAllNodes(&typeAddress); - return NIL; -} - - /* * AlterDatabaseOwnerObjectAddress returns the ObjectAddress of the database that is the * object of the AlterOwnerStmt. Errors if missing_ok is false. diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index bf35a13b7..72b49fc5a 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -34,7 +34,6 @@ typedef bool (*AddressPredicate)(const ObjectAddress *); static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress); static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress); static int ObjectAddressComparator(const void *a, const void *b); -static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency); static List * FilterObjectAddressListByPredicate(List *objectAddressList, AddressPredicate predicate); @@ -289,7 +288,7 @@ GetDistributableDependenciesForObject(const ObjectAddress *target) * GetDependencyCreateDDLCommands returns a list (potentially empty or NIL) of ddl * commands to execute on a worker to create the object. */ -static List * +List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency) { switch (getObjectClass(dependency)) diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index d6d3430bb..aaecb3e98 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -16,6 +16,7 @@ #include "distributed/deparser.h" #include "distributed/pg_version_constants.h" #include "distributed/version_compat.h" +#include "distributed/commands/utility_hook.h" static DistributeObjectOps NoDistributeOps = { .deparse = NULL, @@ -28,31 +29,34 @@ static DistributeObjectOps NoDistributeOps = { static DistributeObjectOps Aggregate_AlterObjectSchema = { .deparse = DeparseAlterFunctionSchemaStmt, .qualify = QualifyAlterFunctionSchemaStmt, - .preprocess = PreprocessAlterFunctionSchemaStmt, - .postprocess = PostprocessAlterFunctionSchemaStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_FUNCTION, .address = AlterFunctionSchemaStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Aggregate_AlterOwner = { .deparse = DeparseAlterFunctionOwnerStmt, .qualify = QualifyAlterFunctionOwnerStmt, - .preprocess = PreprocessAlterFunctionOwnerStmt, - .postprocess = PostprocessAlterFunctionOwnerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_FUNCTION, .address = AlterFunctionOwnerObjectAddress, .markDistributed = false, }; static DistributeObjectOps Aggregate_Define = { .deparse = NULL, .qualify = QualifyDefineAggregateStmt, - .preprocess = PreprocessDefineAggregateStmt, - .postprocess = PostprocessDefineAggregateStmt, + .preprocess = NULL, + .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt, + .objectType = OBJECT_AGGREGATE, .address = DefineAggregateStmtObjectAddress, .markDistributed = true, }; static DistributeObjectOps Aggregate_Drop = { .deparse = DeparseDropFunctionStmt, .qualify = NULL, - .preprocess = PreprocessDropFunctionStmt, + .preprocess = PreprocessDropDistributedObjectStmt, .postprocess = NULL, .address = NULL, .markDistributed = false, @@ -60,16 +64,18 @@ static DistributeObjectOps Aggregate_Drop = { static DistributeObjectOps Aggregate_Rename = { .deparse = DeparseRenameFunctionStmt, .qualify = QualifyRenameFunctionStmt, - .preprocess = PreprocessRenameFunctionStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_FUNCTION, .address = RenameFunctionStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Any_AlterEnum = { .deparse = DeparseAlterEnumStmt, .qualify = QualifyAlterEnumStmt, - .preprocess = PreprocessAlterEnumStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_TYPE, .address = AlterEnumStmtObjectAddress, .markDistributed = false, }; @@ -92,9 +98,10 @@ static DistributeObjectOps Any_AlterExtensionContents = { static DistributeObjectOps Any_AlterForeignServer = { .deparse = DeparseAlterForeignServerStmt, .qualify = NULL, - .preprocess = PreprocessAlterForeignServerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, - .address = NULL, + .objectType = OBJECT_FOREIGN_SERVER, + .address = AlterForeignServerStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Any_AlterFunction = { @@ -148,24 +155,29 @@ static DistributeObjectOps Any_Cluster = { static DistributeObjectOps Any_CompositeType = { .deparse = DeparseCompositeTypeStmt, .qualify = QualifyCompositeTypeStmt, - .preprocess = PreprocessCompositeTypeStmt, - .postprocess = PostprocessCompositeTypeStmt, + .preprocess = NULL, + .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt, + .objectType = OBJECT_TYPE, + .featureFlag = &EnableCreateTypePropagation, .address = CompositeTypeStmtObjectAddress, .markDistributed = true, }; static DistributeObjectOps Any_CreateDomain = { .deparse = DeparseCreateDomainStmt, .qualify = QualifyCreateDomainStmt, - .preprocess = PreprocessCreateDomainStmt, - .postprocess = PostprocessCreateDomainStmt, + .preprocess = NULL, + .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt, + .objectType = OBJECT_DOMAIN, .address = CreateDomainStmtObjectAddress, .markDistributed = true, }; static DistributeObjectOps Any_CreateEnum = { .deparse = DeparseCreateEnumStmt, .qualify = QualifyCreateEnumStmt, - .preprocess = PreprocessCreateEnumStmt, - .postprocess = PostprocessCreateEnumStmt, + .preprocess = NULL, + .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt, + .objectType = OBJECT_TYPE, + .featureFlag = &EnableCreateTypePropagation, .address = CreateEnumStmtObjectAddress, .markDistributed = true, }; @@ -196,8 +208,9 @@ static DistributeObjectOps Any_CreatePolicy = { static DistributeObjectOps Any_CreateForeignServer = { .deparse = DeparseCreateForeignServerStmt, .qualify = NULL, - .preprocess = PreprocessCreateForeignServerStmt, - .postprocess = PostprocessCreateForeignServerStmt, + .preprocess = NULL, + .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt, + .objectType = OBJECT_FOREIGN_SERVER, .address = CreateForeignServerStmtObjectAddress, .markDistributed = true, }; @@ -268,31 +281,34 @@ static DistributeObjectOps Attribute_Rename = { static DistributeObjectOps Collation_AlterObjectSchema = { .deparse = DeparseAlterCollationSchemaStmt, .qualify = QualifyAlterCollationSchemaStmt, - .preprocess = PreprocessAlterCollationSchemaStmt, - .postprocess = PostprocessAlterCollationSchemaStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_COLLATION, .address = AlterCollationSchemaStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Collation_AlterOwner = { .deparse = DeparseAlterCollationOwnerStmt, .qualify = QualifyAlterCollationOwnerStmt, - .preprocess = PreprocessAlterCollationOwnerStmt, - .postprocess = PostprocessAlterCollationOwnerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_COLLATION, .address = AlterCollationOwnerObjectAddress, .markDistributed = false, }; static DistributeObjectOps Collation_Define = { .deparse = NULL, .qualify = NULL, - .preprocess = PreprocessDefineCollationStmt, - .postprocess = PostprocessDefineCollationStmt, + .preprocess = NULL, + .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt, + .objectType = OBJECT_COLLATION, .address = DefineCollationStmtObjectAddress, .markDistributed = true, }; static DistributeObjectOps Collation_Drop = { .deparse = DeparseDropCollationStmt, .qualify = QualifyDropCollationStmt, - .preprocess = PreprocessDropCollationStmt, + .preprocess = PreprocessDropDistributedObjectStmt, .postprocess = NULL, .address = NULL, .markDistributed = false, @@ -300,47 +316,53 @@ static DistributeObjectOps Collation_Drop = { static DistributeObjectOps Collation_Rename = { .deparse = DeparseRenameCollationStmt, .qualify = QualifyRenameCollationStmt, - .preprocess = PreprocessRenameCollationStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_COLLATION, .address = RenameCollationStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Database_AlterOwner = { .deparse = DeparseAlterDatabaseOwnerStmt, .qualify = NULL, - .preprocess = PreprocessAlterDatabaseOwnerStmt, - .postprocess = PostprocessAlterDatabaseOwnerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_DATABASE, + .featureFlag = &EnableAlterDatabaseOwner, .address = AlterDatabaseOwnerObjectAddress, .markDistributed = false, }; static DistributeObjectOps Domain_Alter = { .deparse = DeparseAlterDomainStmt, .qualify = QualifyAlterDomainStmt, - .preprocess = PreprocessAlterDomainStmt, - .postprocess = PostprocessAlterDomainStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_DOMAIN, .address = AlterDomainStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Domain_AlterObjectSchema = { .deparse = DeparseAlterDomainSchemaStmt, .qualify = QualifyAlterDomainSchemaStmt, - .preprocess = PreprocessAlterDomainSchemaStmt, - .postprocess = PostprocessAlterDomainSchemaStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_DOMAIN, .address = AlterTypeSchemaStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Domain_AlterOwner = { .deparse = DeparseAlterDomainOwnerStmt, .qualify = QualifyAlterDomainOwnerStmt, - .preprocess = PreprocessAlterDomainOwnerStmt, - .postprocess = PostprocessAlterDomainOwnerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_DOMAIN, .address = AlterDomainOwnerStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Domain_Drop = { .deparse = DeparseDropDomainStmt, .qualify = QualifyDropDomainStmt, - .preprocess = PreprocessDropDomainStmt, + .preprocess = PreprocessDropDistributedObjectStmt, .postprocess = NULL, .address = NULL, .markDistributed = false, @@ -348,8 +370,9 @@ static DistributeObjectOps Domain_Drop = { static DistributeObjectOps Domain_Rename = { .deparse = DeparseRenameDomainStmt, .qualify = QualifyRenameDomainStmt, - .preprocess = PreprocessRenameDomainStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_DOMAIN, .address = RenameDomainStmtObjectAddress, .markDistributed = false, }; @@ -357,8 +380,9 @@ static DistributeObjectOps Domain_Rename = { static DistributeObjectOps Domain_RenameConstraint = { .deparse = DeparseDomainRenameConstraintStmt, .qualify = QualifyDomainRenameConstraintStmt, - .preprocess = PreprocessDomainRenameConstraintStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_DOMAIN, .address = DomainRenameConstraintStmtObjectAddress, .markDistributed = false, }; @@ -381,7 +405,7 @@ static DistributeObjectOps Extension_Drop = { static DistributeObjectOps ForeignServer_Drop = { .deparse = DeparseDropForeignServerStmt, .qualify = NULL, - .preprocess = PreprocessDropForeignServerStmt, + .preprocess = PreprocessDropDistributedObjectStmt, .postprocess = NULL, .address = NULL, .markDistributed = false, @@ -389,16 +413,18 @@ static DistributeObjectOps ForeignServer_Drop = { static DistributeObjectOps ForeignServer_Rename = { .deparse = DeparseAlterForeignServerRenameStmt, .qualify = NULL, - .preprocess = PreprocessRenameForeignServerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, - .address = NULL, + .objectType = OBJECT_FOREIGN_SERVER, + .address = RenameForeignServerStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps ForeignServer_AlterOwner = { .deparse = DeparseAlterForeignServerOwnerStmt, .qualify = NULL, - .preprocess = PreprocessAlterForeignServerOwnerStmt, - .postprocess = PostprocessAlterForeignServerOwnerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_FOREIGN_SERVER, .address = AlterForeignServerOwnerStmtObjectAddress, .markDistributed = false, }; @@ -421,23 +447,25 @@ static DistributeObjectOps Function_AlterObjectDepends = { static DistributeObjectOps Function_AlterObjectSchema = { .deparse = DeparseAlterFunctionSchemaStmt, .qualify = QualifyAlterFunctionSchemaStmt, - .preprocess = PreprocessAlterFunctionSchemaStmt, - .postprocess = PostprocessAlterFunctionSchemaStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_FUNCTION, .address = AlterFunctionSchemaStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Function_AlterOwner = { .deparse = DeparseAlterFunctionOwnerStmt, .qualify = QualifyAlterFunctionOwnerStmt, - .preprocess = PreprocessAlterFunctionOwnerStmt, - .postprocess = PostprocessAlterFunctionOwnerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_FUNCTION, .address = AlterFunctionOwnerObjectAddress, .markDistributed = false, }; static DistributeObjectOps Function_Drop = { .deparse = DeparseDropFunctionStmt, .qualify = NULL, - .preprocess = PreprocessDropFunctionStmt, + .preprocess = PreprocessDropDistributedObjectStmt, .postprocess = NULL, .address = NULL, .markDistributed = false, @@ -445,8 +473,9 @@ static DistributeObjectOps Function_Drop = { static DistributeObjectOps Function_Rename = { .deparse = DeparseRenameFunctionStmt, .qualify = QualifyRenameFunctionStmt, - .preprocess = PreprocessRenameFunctionStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_FUNCTION, .address = RenameFunctionStmtObjectAddress, .markDistributed = false, }; @@ -485,23 +514,25 @@ static DistributeObjectOps Procedure_AlterObjectDepends = { static DistributeObjectOps Procedure_AlterObjectSchema = { .deparse = DeparseAlterFunctionSchemaStmt, .qualify = QualifyAlterFunctionSchemaStmt, - .preprocess = PreprocessAlterFunctionSchemaStmt, - .postprocess = PostprocessAlterFunctionSchemaStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_FUNCTION, .address = AlterFunctionSchemaStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Procedure_AlterOwner = { .deparse = DeparseAlterFunctionOwnerStmt, .qualify = QualifyAlterFunctionOwnerStmt, - .preprocess = PreprocessAlterFunctionOwnerStmt, - .postprocess = PostprocessAlterFunctionOwnerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_FUNCTION, .address = AlterFunctionOwnerObjectAddress, .markDistributed = false, }; static DistributeObjectOps Procedure_Drop = { .deparse = DeparseDropFunctionStmt, .qualify = NULL, - .preprocess = PreprocessDropFunctionStmt, + .preprocess = PreprocessDropDistributedObjectStmt, .postprocess = NULL, .address = NULL, .markDistributed = false, @@ -509,8 +540,9 @@ static DistributeObjectOps Procedure_Drop = { static DistributeObjectOps Procedure_Rename = { .deparse = DeparseRenameFunctionStmt, .qualify = QualifyRenameFunctionStmt, - .preprocess = PreprocessRenameFunctionStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_FUNCTION, .address = RenameFunctionStmtObjectAddress, .markDistributed = false, }; @@ -565,32 +597,36 @@ static DistributeObjectOps Sequence_Rename = { static DistributeObjectOps TextSearchConfig_Alter = { .deparse = DeparseAlterTextSearchConfigurationStmt, .qualify = QualifyAlterTextSearchConfigurationStmt, - .preprocess = PreprocessAlterTextSearchConfigurationStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_TSCONFIGURATION, .address = AlterTextSearchConfigurationStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps TextSearchConfig_AlterObjectSchema = { .deparse = DeparseAlterTextSearchConfigurationSchemaStmt, .qualify = QualifyAlterTextSearchConfigurationSchemaStmt, - .preprocess = PreprocessAlterTextSearchConfigurationSchemaStmt, - .postprocess = PostprocessAlterTextSearchConfigurationSchemaStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_TSCONFIGURATION, .address = AlterTextSearchConfigurationSchemaStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps TextSearchConfig_AlterOwner = { .deparse = DeparseAlterTextSearchConfigurationOwnerStmt, .qualify = QualifyAlterTextSearchConfigurationOwnerStmt, - .preprocess = PreprocessAlterTextSearchConfigurationOwnerStmt, - .postprocess = PostprocessAlterTextSearchConfigurationOwnerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_TSCONFIGURATION, .address = AlterTextSearchConfigurationOwnerObjectAddress, .markDistributed = false, }; static DistributeObjectOps TextSearchConfig_Comment = { .deparse = DeparseTextSearchConfigurationCommentStmt, .qualify = QualifyTextSearchConfigurationCommentStmt, - .preprocess = PreprocessTextSearchConfigurationCommentStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_TSCONFIGURATION, .address = TextSearchConfigurationCommentObjectAddress, .markDistributed = false, }; @@ -598,14 +634,15 @@ static DistributeObjectOps TextSearchConfig_Define = { .deparse = DeparseCreateTextSearchConfigurationStmt, .qualify = NULL, .preprocess = NULL, - .postprocess = PostprocessCreateTextSearchConfigurationStmt, + .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt, + .objectType = OBJECT_TSCONFIGURATION, .address = CreateTextSearchConfigurationObjectAddress, .markDistributed = true, }; static DistributeObjectOps TextSearchConfig_Drop = { .deparse = DeparseDropTextSearchConfigurationStmt, .qualify = QualifyDropTextSearchConfigurationStmt, - .preprocess = PreprocessDropTextSearchConfigurationStmt, + .preprocess = PreprocessDropDistributedObjectStmt, .postprocess = NULL, .address = NULL, .markDistributed = false, @@ -613,40 +650,45 @@ static DistributeObjectOps TextSearchConfig_Drop = { static DistributeObjectOps TextSearchConfig_Rename = { .deparse = DeparseRenameTextSearchConfigurationStmt, .qualify = QualifyRenameTextSearchConfigurationStmt, - .preprocess = PreprocessRenameTextSearchConfigurationStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_TSCONFIGURATION, .address = RenameTextSearchConfigurationStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps TextSearchDict_Alter = { .deparse = DeparseAlterTextSearchDictionaryStmt, .qualify = QualifyAlterTextSearchDictionaryStmt, - .preprocess = PreprocessAlterTextSearchDictionaryStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_TSDICTIONARY, .address = AlterTextSearchDictionaryStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps TextSearchDict_AlterObjectSchema = { .deparse = DeparseAlterTextSearchDictionarySchemaStmt, .qualify = QualifyAlterTextSearchDictionarySchemaStmt, - .preprocess = PreprocessAlterTextSearchDictionarySchemaStmt, - .postprocess = PostprocessAlterTextSearchDictionarySchemaStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_TSDICTIONARY, .address = AlterTextSearchDictionarySchemaStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps TextSearchDict_AlterOwner = { .deparse = DeparseAlterTextSearchDictionaryOwnerStmt, .qualify = QualifyAlterTextSearchDictionaryOwnerStmt, - .preprocess = PreprocessAlterTextSearchDictionaryOwnerStmt, - .postprocess = PostprocessAlterTextSearchDictionaryOwnerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_TSDICTIONARY, .address = AlterTextSearchDictOwnerObjectAddress, .markDistributed = false, }; static DistributeObjectOps TextSearchDict_Comment = { .deparse = DeparseTextSearchDictionaryCommentStmt, .qualify = QualifyTextSearchDictionaryCommentStmt, - .preprocess = PreprocessTextSearchDictionaryCommentStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_TSDICTIONARY, .address = TextSearchDictCommentObjectAddress, .markDistributed = false, }; @@ -654,14 +696,15 @@ static DistributeObjectOps TextSearchDict_Define = { .deparse = DeparseCreateTextSearchDictionaryStmt, .qualify = NULL, .preprocess = NULL, - .postprocess = PostprocessCreateTextSearchDictionaryStmt, + .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt, + .objectType = OBJECT_TSDICTIONARY, .address = CreateTextSearchDictObjectAddress, .markDistributed = true, }; static DistributeObjectOps TextSearchDict_Drop = { .deparse = DeparseDropTextSearchDictionaryStmt, .qualify = QualifyDropTextSearchDictionaryStmt, - .preprocess = PreprocessDropTextSearchDictionaryStmt, + .preprocess = PreprocessDropDistributedObjectStmt, .postprocess = NULL, .address = NULL, .markDistributed = false, @@ -669,8 +712,9 @@ static DistributeObjectOps TextSearchDict_Drop = { static DistributeObjectOps TextSearchDict_Rename = { .deparse = DeparseRenameTextSearchDictionaryStmt, .qualify = QualifyRenameTextSearchDictionaryStmt, - .preprocess = PreprocessRenameTextSearchDictionaryStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_TSDICTIONARY, .address = RenameTextSearchDictionaryStmtObjectAddress, .markDistributed = false, }; @@ -685,23 +729,25 @@ static DistributeObjectOps Trigger_AlterObjectDepends = { static DistributeObjectOps Routine_AlterObjectSchema = { .deparse = DeparseAlterFunctionSchemaStmt, .qualify = QualifyAlterFunctionSchemaStmt, - .preprocess = PreprocessAlterFunctionSchemaStmt, - .postprocess = PostprocessAlterFunctionSchemaStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_FUNCTION, .address = AlterFunctionSchemaStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Routine_AlterOwner = { .deparse = DeparseAlterFunctionOwnerStmt, .qualify = QualifyAlterFunctionOwnerStmt, - .preprocess = PreprocessAlterFunctionOwnerStmt, - .postprocess = PostprocessAlterFunctionOwnerStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_FUNCTION, .address = AlterFunctionOwnerObjectAddress, .markDistributed = false, }; static DistributeObjectOps Routine_Drop = { .deparse = DeparseDropFunctionStmt, .qualify = NULL, - .preprocess = PreprocessDropFunctionStmt, + .preprocess = PreprocessDropDistributedObjectStmt, .postprocess = NULL, .address = NULL, .markDistributed = false, @@ -709,8 +755,9 @@ static DistributeObjectOps Routine_Drop = { static DistributeObjectOps Routine_Rename = { .deparse = DeparseRenameFunctionStmt, .qualify = QualifyRenameFunctionStmt, - .preprocess = PreprocessRenameFunctionStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_FUNCTION, .address = RenameFunctionStmtObjectAddress, .markDistributed = false, }; @@ -733,8 +780,9 @@ static DistributeObjectOps Schema_Grant = { static DistributeObjectOps Schema_Rename = { .deparse = DeparseAlterSchemaRenameStmt, .qualify = NULL, - .preprocess = PreprocessAlterSchemaRenameStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_SCHEMA, .address = AlterSchemaRenameStmtObjectAddress, .markDistributed = false, }; @@ -807,31 +855,34 @@ static DistributeObjectOps Table_Drop = { static DistributeObjectOps Type_AlterObjectSchema = { .deparse = DeparseAlterTypeSchemaStmt, .qualify = QualifyAlterTypeSchemaStmt, - .preprocess = PreprocessAlterTypeSchemaStmt, - .postprocess = PostprocessAlterTypeSchemaStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_TYPE, .address = AlterTypeSchemaStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Type_AlterOwner = { .deparse = DeparseAlterTypeOwnerStmt, .qualify = QualifyAlterTypeOwnerStmt, - .preprocess = PreprocessAlterTypeOwnerStmt, - .postprocess = NULL, + .preprocess = PreprocessAlterDistributedObjectStmt, + .postprocess = PostprocessAlterDistributedObjectStmt, + .objectType = OBJECT_TYPE, .address = AlterTypeOwnerObjectAddress, .markDistributed = false, }; static DistributeObjectOps Type_AlterTable = { .deparse = DeparseAlterTypeStmt, .qualify = QualifyAlterTypeStmt, - .preprocess = PreprocessAlterTypeStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_TYPE, .address = AlterTypeStmtObjectAddress, .markDistributed = false, }; static DistributeObjectOps Type_Drop = { .deparse = DeparseDropTypeStmt, .qualify = NULL, - .preprocess = PreprocessDropTypeStmt, + .preprocess = PreprocessDropDistributedObjectStmt, .postprocess = NULL, .address = NULL, .markDistributed = false, @@ -847,8 +898,9 @@ static DistributeObjectOps Trigger_Drop = { static DistributeObjectOps Type_Rename = { .deparse = DeparseRenameTypeStmt, .qualify = QualifyRenameTypeStmt, - .preprocess = PreprocessRenameTypeStmt, + .preprocess = PreprocessAlterDistributedObjectStmt, .postprocess = NULL, + .objectType = OBJECT_TYPE, .address = RenameTypeStmtObjectAddress, .markDistributed = false, }; diff --git a/src/backend/distributed/commands/domain.c b/src/backend/distributed/commands/domain.c index 7e4d5c080..c75af0024 100644 --- a/src/backend/distributed/commands/domain.c +++ b/src/backend/distributed/commands/domain.c @@ -37,382 +37,8 @@ static CollateClause * MakeCollateClauseFromOid(Oid collationOid); -static List * FilterNameListForDistributedDomains(List *domainNames, bool missing_ok, - List **distributedDomainAddresses); static ObjectAddress GetDomainAddressByName(TypeName *domainName, bool missing_ok); -/* - * PreprocessCreateDomainStmt handles the propagation of the create domain statements. - */ -List * -PreprocessCreateDomainStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - if (!ShouldPropagate()) - { - return NIL; - } - - /* check creation against multi-statement transaction policy */ - if (!ShouldPropagateCreateInCoordinatedTransction()) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_DOMAIN); - - QualifyTreeNode(node); - const char *sql = DeparseTreeNode(node); - sql = WrapCreateOrReplace(sql); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessCreateDomainStmt gets called after the domain has been created locally. When - * the domain is decided to be propagated we make sure all the domains dependencies exist - * on all workers. - */ -List * -PostprocessCreateDomainStmt(Node *node, const char *queryString) -{ - if (!ShouldPropagate()) - { - return NIL; - } - - /* check creation against multi-statement transaction policy */ - if (!ShouldPropagateCreateInCoordinatedTransction()) - { - return NIL; - } - - /* - * find object address of the just created object, because the domain has been created - * locally it can't be missing - */ - ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false); - EnsureDependenciesExistOnAllNodes(&typeAddress); - - return NIL; -} - - -/* - * PreprocessDropDomainStmt gets called before dropping the domain locally. For - * distributed domains it will make sure the fully qualified statement is forwarded to all - * the workers reflecting the drop of the domain. - */ -List * -PreprocessDropDomainStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - DropStmt *stmt = castNode(DropStmt, node); - - - if (!ShouldPropagate()) - { - return NIL; - } - - QualifyTreeNode((Node *) stmt); - - List *oldDomains = stmt->objects; - List *distributedDomainAddresses = NIL; - List *distributedDomains = FilterNameListForDistributedDomains( - oldDomains, - stmt->missing_ok, - &distributedDomainAddresses); - if (list_length(distributedDomains) <= 0) - { - /* no distributed domains to drop */ - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_DOMAIN); - - ObjectAddress *addressItem = NULL; - foreach_ptr(addressItem, distributedDomainAddresses) - { - UnmarkObjectDistributed(addressItem); - } - - /* - * temporary swap the lists of objects to delete with the distributed objects and - * deparse to an executable sql statement for the workers - */ - stmt->objects = distributedDomains; - char *dropStmtSql = DeparseTreeNode((Node *) stmt); - stmt->objects = oldDomains; - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) dropStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterDomainStmt gets called for all domain specific alter statements. When - * the change happens on a distributed domain we reflect the changes on the workers. - */ -List * -PreprocessAlterDomainStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterDomainStmt *stmt = castNode(AlterDomainStmt, node); - - ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&domainAddress)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_DOMAIN); - - QualifyTreeNode((Node *) stmt); - char *sqlStmt = DeparseTreeNode((Node *) stmt); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sqlStmt, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessAlterDomainStmt gets called after the domain has been altered locally. A - * change on the constraints could cause new (undistributed) objects to be dependencies of - * the domain. Here we recreate any new dependencies on the workers before we forward the - * alter statement to the workers. - */ -List * -PostprocessAlterDomainStmt(Node *node, const char *queryString) -{ - AlterDomainStmt *stmt = castNode(AlterDomainStmt, node); - - ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&domainAddress)) - { - return NIL; - } - - EnsureDependenciesExistOnAllNodes(&domainAddress); - return NIL; -} - - -/* - * PreprocessDomainRenameConstraintStmt gets called locally when a constraint on a domain - * is renamed. When the constraint is on a distributed domain we forward the statement - * appropriately. - */ -List * -PreprocessDomainRenameConstraintStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - RenameStmt *stmt = castNode(RenameStmt, node); - Assert(stmt->renameType == OBJECT_DOMCONSTRAINT); - - ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&domainAddress)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_DOMAIN); - - QualifyTreeNode((Node *) stmt); - char *sqlStmt = DeparseTreeNode((Node *) stmt); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sqlStmt, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterDomainOwnerStmt called locally when the owner of a constraint is - * changed. For distributed domains the statement is forwarded to all the workers. - */ -List * -PreprocessAlterDomainOwnerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - Assert(stmt->objectType == OBJECT_DOMAIN); - - ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&domainAddress)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_DOMAIN); - - QualifyTreeNode((Node *) stmt); - char *sqlStmt = DeparseTreeNode((Node *) stmt); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sqlStmt, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessAlterDomainOwnerStmt given the change of ownership could cause new - * dependencies to exist for the domain we make sure all dependencies for the domain are - * created before we forward the statement to the workers. - */ -List * -PostprocessAlterDomainOwnerStmt(Node *node, const char *queryString) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - - ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&domainAddress)) - { - return NIL; - } - - EnsureDependenciesExistOnAllNodes(&domainAddress); - return NIL; -} - - -/* - * PreprocessRenameDomainStmt creates the statements to execute on the workers when the - * domain being renamed is distributed. - */ -List * -PreprocessRenameDomainStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - RenameStmt *stmt = castNode(RenameStmt, node); - Assert(stmt->renameType == OBJECT_DOMAIN); - - ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&domainAddress)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_DOMAIN); - - QualifyTreeNode((Node *) stmt); - char *sqlStmt = DeparseTreeNode((Node *) stmt); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sqlStmt, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterDomainSchemaStmt cretes the statements to execute on the workers when - * the domain being moved to a new schema has been distributed. - */ -List * -PreprocessAlterDomainSchemaStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - Assert(stmt->objectType == OBJECT_DOMAIN); - - ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&domainAddress)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_DOMAIN); - - QualifyTreeNode((Node *) stmt); - char *sqlStmt = DeparseTreeNode((Node *) stmt); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sqlStmt, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessAlterDomainSchemaStmt makes sure any new dependencies (as result of the - * schema move) are created on the workers before we forward the statement. - */ -List * -PostprocessAlterDomainSchemaStmt(Node *node, const char *queryString) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - - ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&domainAddress)) - { - return NIL; - } - - EnsureDependenciesExistOnAllNodes(&domainAddress); - return NIL; -} - - -/* - * FilterNameListForDistributedDomains given a liost of domain names we will return a list - * filtered with only the names of distributed domains remaining. The pointer to the list - * distributedDomainAddresses is populated with a list of ObjectAddresses of the domains - * that are distributed. Indices between the returned list and the object addresses are - * synchronizes. - * Note: the pointer in distributedDomainAddresses should not be omitted - * - * When missing_ok is false this function will raise an error if a domain identified by a - * domain name cannot be found. - */ -static List * -FilterNameListForDistributedDomains(List *domainNames, bool missing_ok, - List **distributedDomainAddresses) -{ - Assert(distributedDomainAddresses != NULL); - - List *distributedDomainNames = NIL; - TypeName *domainName = NULL; - foreach_ptr(domainName, domainNames) - { - ObjectAddress objectAddress = GetDomainAddressByName(domainName, missing_ok); - if (IsObjectDistributed(&objectAddress)) - { - distributedDomainNames = lappend(distributedDomainNames, domainName); - if (distributedDomainAddresses) - { - ObjectAddress *allocatedAddress = palloc0(sizeof(ObjectAddress)); - *allocatedAddress = objectAddress; - *distributedDomainAddresses = lappend(*distributedDomainAddresses, - allocatedAddress); - } - } - } - - return distributedDomainNames; -} - - /* * GetDomainAddressByName returns the ObjectAddress of the domain identified by * domainName. When missing_ok is true the object id part of the ObjectAddress can be diff --git a/src/backend/distributed/commands/foreign_server.c b/src/backend/distributed/commands/foreign_server.c index 0777814df..a5fa21c0e 100644 --- a/src/backend/distributed/commands/foreign_server.c +++ b/src/backend/distributed/commands/foreign_server.c @@ -25,238 +25,9 @@ #include "nodes/primnodes.h" static Node * RecreateForeignServerStmt(Oid serverId); -static bool NameListHasDistributedServer(List *serverNames); static ObjectAddress GetObjectAddressByServerName(char *serverName, bool missing_ok); -/* - * PreprocessCreateForeignServerStmt is called during the planning phase for - * CREATE SERVER. - */ -List * -PreprocessCreateForeignServerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - if (!ShouldPropagate()) - { - return NIL; - } - - /* check creation against multi-statement transaction policy */ - if (!ShouldPropagateCreateInCoordinatedTransction()) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_FOREIGN_SERVER); - - char *sql = DeparseTreeNode(node); - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterForeignServerStmt is called during the planning phase for - * ALTER SERVER .. OPTIONS .. - */ -List * -PreprocessAlterForeignServerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterForeignServerStmt *stmt = castNode(AlterForeignServerStmt, node); - - ObjectAddress address = GetObjectAddressByServerName(stmt->servername, false); - - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - - char *sql = DeparseTreeNode(node); - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessRenameForeignServerStmt is called during the planning phase for - * ALTER SERVER RENAME. - */ -List * -PreprocessRenameForeignServerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - RenameStmt *stmt = castNode(RenameStmt, node); - Assert(stmt->renameType == OBJECT_FOREIGN_SERVER); - - ObjectAddress address = GetObjectAddressByServerName(strVal(stmt->object), false); - - /* filter distributed servers */ - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - - char *sql = DeparseTreeNode(node); - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterForeignServerOwnerStmt is called during the planning phase for - * ALTER SERVER .. OWNER TO. - */ -List * -PreprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - Assert(stmt->objectType == OBJECT_FOREIGN_SERVER); - - ObjectAddress address = GetObjectAddressByServerName(strVal(stmt->object), false); - - /* filter distributed servers */ - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - - char *sql = DeparseTreeNode(node); - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessDropForeignServerStmt is called during the planning phase for - * DROP SERVER. - */ -List * -PreprocessDropForeignServerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - DropStmt *stmt = castNode(DropStmt, node); - Assert(stmt->removeType == OBJECT_FOREIGN_SERVER); - - bool includesDistributedServer = NameListHasDistributedServer(stmt->objects); - - if (!includesDistributedServer) - { - return NIL; - } - - if (list_length(stmt->objects) > 1) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot drop distributed server with other servers"), - errhint("Try dropping each object in a separate DROP command"))); - } - - if (!ShouldPropagate()) - { - return NIL; - } - - EnsureCoordinator(); - - Assert(list_length(stmt->objects) == 1); - - Value *serverValue = linitial(stmt->objects); - ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false); - - /* unmark distributed server */ - UnmarkObjectDistributed(&address); - - const char *deparsedStmt = DeparseTreeNode((Node *) stmt); - - /* - * To prevent recursive propagation in mx architecture, we disable ddl - * propagation before sending the command to workers. - */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) deparsedStmt, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessCreateForeignServerStmt is called after a CREATE SERVER command has - * been executed by standard process utility. - */ -List * -PostprocessCreateForeignServerStmt(Node *node, const char *queryString) -{ - if (!ShouldPropagate()) - { - return NIL; - } - - /* check creation against multi-statement transaction policy */ - if (!ShouldPropagateCreateInCoordinatedTransction()) - { - return NIL; - } - - const bool missingOk = false; - ObjectAddress address = GetObjectAddressFromParseTree(node, missingOk); - EnsureDependenciesExistOnAllNodes(&address); - - return NIL; -} - - -/* - * PostprocessAlterForeignServerOwnerStmt is called after a ALTER SERVER OWNER command - * has been executed by standard process utility. - */ -List * -PostprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString) -{ - const bool missingOk = false; - ObjectAddress address = GetObjectAddressFromParseTree(node, missingOk); - - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureDependenciesExistOnAllNodes(&address); - - return NIL; -} - - /* * CreateForeignServerStmtObjectAddress finds the ObjectAddress for the server * that is created by given CreateForeignServerStmt. If missingOk is false and if @@ -274,6 +45,41 @@ CreateForeignServerStmtObjectAddress(Node *node, bool missing_ok) } +/* + * AlterForeignServerStmtObjectAddress finds the ObjectAddress for the server that is + * changed by given AlterForeignServerStmt. If missingOk is false and if + * the server does not exist, then it errors out. + * + * Never returns NULL, but the objid in the address can be invalid if missingOk + * was set to true. + */ +ObjectAddress +AlterForeignServerStmtObjectAddress(Node *node, bool missing_ok) +{ + AlterForeignServerStmt *stmt = castNode(AlterForeignServerStmt, node); + + return GetObjectAddressByServerName(stmt->servername, missing_ok); +} + + +/* + * RenameForeignServerStmtObjectAddress finds the ObjectAddress for the server that is + * renamed by given RenmaeStmt. If missingOk is false and if the server does not exist, + * then it errors out. + * + * Never returns NULL, but the objid in the address can be invalid if missingOk + * was set to true. + */ +ObjectAddress +RenameForeignServerStmtObjectAddress(Node *node, bool missing_ok) +{ + RenameStmt *stmt = castNode(RenameStmt, node); + Assert(stmt->renameType == OBJECT_FOREIGN_SERVER); + + return GetObjectAddressByServerName(strVal(stmt->object), missing_ok); +} + + /* * AlterForeignServerOwnerStmtObjectAddress finds the ObjectAddress for the server * given in AlterOwnerStmt. If missingOk is false and if @@ -355,28 +161,6 @@ RecreateForeignServerStmt(Oid serverId) } -/* - * NameListHasDistributedServer takes a namelist of servers and returns true if at least - * one of them is distributed. Returns false otherwise. - */ -static bool -NameListHasDistributedServer(List *serverNames) -{ - Value *serverValue = NULL; - foreach_ptr(serverValue, serverNames) - { - ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false); - - if (IsObjectDistributed(&address)) - { - return true; - } - } - - return false; -} - - static ObjectAddress GetObjectAddressByServerName(char *serverName, bool missing_ok) { diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 879aa4770..43f5ba502 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -1502,234 +1502,6 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString, } -/* - * PreprocessRenameFunctionStmt is called when the user is renaming a function. The invocation - * happens before the statement is applied locally. - * - * As the function already exists we have access to the ObjectAddress, this is used to - * check if it is distributed. If so the rename is executed on all the workers to keep the - * types in sync across the cluster. - */ -List * -PreprocessRenameFunctionStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - RenameStmt *stmt = castNode(RenameStmt, node); - AssertObjectTypeIsFunctional(stmt->renameType); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateAlterFunction(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_FUNCTION); - QualifyTreeNode((Node *) stmt); - const char *sql = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterFunctionSchemaStmt is executed before the statement is applied to the local - * postgres instance. - * - * In this stage we can prepare the commands that need to be run on all workers. - */ -List * -PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - AssertObjectTypeIsFunctional(stmt->objectType); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateAlterFunction(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_FUNCTION); - QualifyTreeNode((Node *) stmt); - const char *sql = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterFunctionOwnerStmt is called for change of owner ship of functions before the owner - * ship is changed on the local instance. - * - * If the function for which the owner is changed is distributed we execute the change on - * all the workers to keep the type in sync across the cluster. - */ -List * -PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - AssertObjectTypeIsFunctional(stmt->objectType); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateAlterFunction(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_FUNCTION); - QualifyTreeNode((Node *) stmt); - const char *sql = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessAlterFunctionOwnerStmt is invoked after the owner has been changed locally. - * Since changing the owner could result in new dependencies being found for this object - * we re-ensure all the dependencies for the function do exist. - * - * This is solely to propagate the new owner (and all its dependencies) if it was not - * already distributed in the cluster. - */ -List * -PostprocessAlterFunctionOwnerStmt(Node *node, const char *queryString) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - AssertObjectTypeIsFunctional(stmt->objectType); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateAlterFunction(&address)) - { - return NIL; - } - - EnsureDependenciesExistOnAllNodes(&address); - - return NIL; -} - - -/* - * PreprocessDropFunctionStmt gets called during the planning phase of a DROP FUNCTION statement - * and returns a list of DDLJob's that will drop any distributed functions from the - * workers. - * - * The DropStmt could have multiple objects to drop, the list of objects will be filtered - * to only keep the distributed functions for deletion on the workers. Non-distributed - * functions will still be dropped locally but not on the workers. - */ -List * -PreprocessDropFunctionStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - DropStmt *stmt = castNode(DropStmt, node); - List *deletingObjectWithArgsList = stmt->objects; - List *distributedObjectWithArgsList = NIL; - List *distributedFunctionAddresses = NIL; - - AssertObjectTypeIsFunctional(stmt->removeType); - - if (creating_extension) - { - /* - * extensions should be created separately on the workers, types cascading from an - * extension should therefore not be propagated here. - */ - return NIL; - } - - if (!EnableMetadataSync) - { - /* - * we are configured to disable object propagation, should not propagate anything - */ - return NIL; - } - - - /* - * Our statements need to be fully qualified so we can drop them from the right schema - * on the workers - */ - QualifyTreeNode((Node *) stmt); - - /* - * iterate over all functions to be dropped and filter to keep only distributed - * functions. - */ - ObjectWithArgs *func = NULL; - foreach_ptr(func, deletingObjectWithArgsList) - { - ObjectAddress address = FunctionToObjectAddress(stmt->removeType, func, - stmt->missing_ok); - - if (!IsObjectDistributed(&address)) - { - continue; - } - - /* collect information for all distributed functions */ - ObjectAddress *addressp = palloc(sizeof(ObjectAddress)); - *addressp = address; - distributedFunctionAddresses = lappend(distributedFunctionAddresses, addressp); - distributedObjectWithArgsList = lappend(distributedObjectWithArgsList, func); - } - - if (list_length(distributedObjectWithArgsList) <= 0) - { - /* no distributed functions to drop */ - return NIL; - } - - /* - * managing types can only be done on the coordinator if ddl propagation is on. when - * it is off we will never get here. MX workers don't have a notion of distributed - * types, so we block the call. - */ - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_FUNCTION); - - /* remove the entries for the distributed objects on dropping */ - ObjectAddress *address = NULL; - foreach_ptr(address, distributedFunctionAddresses) - { - UnmarkObjectDistributed(address); - } - - /* - * Swap the list of objects before deparsing and restore the old list after. This - * ensures we only have distributed functions in the deparsed drop statement. - */ - DropStmt *stmtCopy = copyObject(stmt); - stmtCopy->objects = distributedObjectWithArgsList; - const char *dropStmtSql = DeparseTreeNode((Node *) stmtCopy); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) dropStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - /* * PreprocessAlterFunctionDependsStmt is called during the planning phase of an * ALTER FUNCION ... DEPENDS ON EXTENSION ... statement. Since functions depending on @@ -1803,30 +1575,6 @@ AlterFunctionDependsStmtObjectAddress(Node *node, bool missing_ok) } -/* - * PostprocessAlterFunctionSchemaStmt is executed after the change has been applied locally, - * we can now use the new dependencies of the function to ensure all its dependencies - * exist on the workers before we apply the commands remotely. - */ -List * -PostprocessAlterFunctionSchemaStmt(Node *node, const char *queryString) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - AssertObjectTypeIsFunctional(stmt->objectType); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateAlterFunction(&address)) - { - return NIL; - } - - /* dependencies have changed (schema) let's ensure they exist */ - EnsureDependenciesExistOnAllNodes(&address); - - return NIL; -} - - /* * AlterFunctionStmtObjectAddress returns the ObjectAddress of the subject in the * AlterFunctionStmt. If missing_ok is set to false an error will be raised if postgres diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index cdee81349..e75f4527d 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -182,43 +182,6 @@ PreprocessGrantOnSchemaStmt(Node *node, const char *queryString, } -/* - * PreprocessAlterSchemaRenameStmt is called when the user is renaming a schema. - * The invocation happens before the statement is applied locally. - * - * As the schema already exists we have access to the ObjectAddress for the schema, this - * is used to check if the schmea is distributed. If the schema is distributed the rename - * is executed on all the workers to keep the schemas in sync across the cluster. - */ -List * -PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - ObjectAddress schemaAddress = GetObjectAddressFromParseTree(node, false); - if (!ShouldPropagateObject(&schemaAddress)) - { - return NIL; - } - - EnsureCoordinator(); - - /* fully qualify */ - QualifyTreeNode(node); - - /* deparse sql*/ - const char *renameStmtSql = DeparseTreeNode(node); - - EnsureSequentialMode(OBJECT_SCHEMA); - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) renameStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - /* * CreateSchemaStmtObjectAddress returns the ObjectAddress of the schema that is * the object of the CreateSchemaStmt. Errors if missing_ok is false. diff --git a/src/backend/distributed/commands/text_search.c b/src/backend/distributed/commands/text_search.c index 1b5e84aa7..05319324d 100644 --- a/src/backend/distributed/commands/text_search.c +++ b/src/backend/distributed/commands/text_search.c @@ -42,8 +42,6 @@ #include "distributed/worker_create_or_replace.h" -static List * GetDistributedTextSearchConfigurationNames(DropStmt *stmt); -static List * GetDistributedTextSearchDictionaryNames(DropStmt *stmt); static DefineStmt * GetTextSearchConfigDefineStmt(Oid tsconfigOid); static DefineStmt * GetTextSearchDictionaryDefineStmt(Oid tsdictOid); static List * GetTextSearchDictionaryInitOptions(HeapTuple tup, Form_pg_ts_dict dict); @@ -59,113 +57,6 @@ static List * get_ts_template_namelist(Oid tstemplateOid); static Oid get_ts_config_parser_oid(Oid tsconfigOid); static char * get_ts_parser_tokentype_name(Oid parserOid, int32 tokentype); -/* - * PostprocessCreateTextSearchConfigurationStmt is called after the TEXT SEARCH - * CONFIGURATION has been created locally. - * - * Contrary to many other objects a text search configuration is often created as a copy - * of an existing configuration. After the copy there is no relation to the configuration - * that has been copied. This prevents our normal approach of ensuring dependencies to - * exist before forwarding a close ressemblance of the statement the user executed. - * - * Instead we recreate the object based on what we find in our own catalog, hence the - * amount of work we perform in the postprocess function, contrary to other objects. - */ -List * -PostprocessCreateTextSearchConfigurationStmt(Node *node, const char *queryString) -{ - DefineStmt *stmt = castNode(DefineStmt, node); - Assert(stmt->kind == OBJECT_TSCONFIGURATION); - - if (!ShouldPropagate()) - { - return NIL; - } - - /* check creation against multi-statement transaction policy */ - if (!ShouldPropagateCreateInCoordinatedTransction()) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSCONFIGURATION); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - - DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&address); - if (errMsg != NULL) - { - RaiseDeferredError(errMsg, WARNING); - return NIL; - } - - EnsureDependenciesExistOnAllNodes(&address); - - /* - * TEXT SEARCH CONFIGURATION objects are more complex with their mappings and the - * possibility of copying from existing templates that we will require the idempotent - * recreation commands to be run for successful propagation - */ - List *commands = CreateTextSearchConfigDDLCommandsIdempotent(&address); - - commands = lcons(DISABLE_DDL_PROPAGATION, commands); - commands = lappend(commands, ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessCreateTextSearchDictionaryStmt is called after the TEXT SEARCH DICTIONARY has been - * created locally. - */ -List * -PostprocessCreateTextSearchDictionaryStmt(Node *node, const char *queryString) -{ - DefineStmt *stmt = castNode(DefineStmt, node); - Assert(stmt->kind == OBJECT_TSDICTIONARY); - - if (!ShouldPropagate()) - { - return NIL; - } - - /* check creation against multi-statement transaction policy */ - if (!ShouldPropagateCreateInCoordinatedTransction()) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSDICTIONARY); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - - DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&address); - if (errMsg != NULL) - { - RaiseDeferredError(errMsg, WARNING); - return NIL; - } - - EnsureDependenciesExistOnAllNodes(&address); - - QualifyTreeNode(node); - const char *createTSDictionaryStmtSql = DeparseTreeNode(node); - - /* - * To prevent recursive propagation in mx architecture, we disable ddl - * propagation before sending the command to workers. - */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) createTSDictionaryStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - List * GetCreateTextSearchConfigStatements(const ObjectAddress *address) { @@ -234,602 +125,6 @@ CreateTextSearchDictDDLCommandsIdempotent(const ObjectAddress *address) } -/* - * PreprocessDropTextSearchConfigurationStmt prepares the statements we need to send to - * the workers. After we have dropped the configurations locally they also got removed from - * pg_dist_object so it is important to do all distribution checks before the change is - * made locally. - */ -List * -PreprocessDropTextSearchConfigurationStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - DropStmt *stmt = castNode(DropStmt, node); - Assert(stmt->removeType == OBJECT_TSCONFIGURATION); - - if (!ShouldPropagate()) - { - return NIL; - } - - List *distributedObjects = GetDistributedTextSearchConfigurationNames(stmt); - if (list_length(distributedObjects) == 0) - { - /* no distributed objects to remove */ - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSCONFIGURATION); - - /* - * Temporarily replace the list of objects being dropped with only the list - * containing the distributed objects. After we have created the sql statement we - * restore the original list of objects to execute on locally. - * - * Because searchpaths on coordinator and workers might not be in sync we fully - * qualify the list before deparsing. This is safe because qualification doesn't - * change the original names in place, but insteads creates new ones. - */ - List *originalObjects = stmt->objects; - stmt->objects = distributedObjects; - QualifyTreeNode((Node *) stmt); - const char *dropStmtSql = DeparseTreeNode((Node *) stmt); - stmt->objects = originalObjects; - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) dropStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessDropTextSearchDictionaryStmt prepares the statements we need to send to - * the workers. After we have dropped the dictionaries locally they also got removed from - * pg_dist_object so it is important to do all distribution checks before the change is - * made locally. - */ -List * -PreprocessDropTextSearchDictionaryStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - DropStmt *stmt = castNode(DropStmt, node); - Assert(stmt->removeType == OBJECT_TSDICTIONARY); - - if (!ShouldPropagate()) - { - return NIL; - } - - List *distributedObjects = GetDistributedTextSearchDictionaryNames(stmt); - if (list_length(distributedObjects) == 0) - { - /* no distributed objects to remove */ - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSDICTIONARY); - - /* - * Temporarily replace the list of objects being dropped with only the list - * containing the distributed objects. After we have created the sql statement we - * restore the original list of objects to execute on locally. - * - * Because searchpaths on coordinator and workers might not be in sync we fully - * qualify the list before deparsing. This is safe because qualification doesn't - * change the original names in place, but insteads creates new ones. - */ - List *originalObjects = stmt->objects; - stmt->objects = distributedObjects; - QualifyTreeNode((Node *) stmt); - const char *dropStmtSql = DeparseTreeNode((Node *) stmt); - stmt->objects = originalObjects; - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) dropStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * GetDistributedTextSearchConfigurationNames iterates over all text search configurations - * dropped, and create a list containing all configurations that are distributed. - */ -static List * -GetDistributedTextSearchConfigurationNames(DropStmt *stmt) -{ - List *objName = NULL; - List *distributedObjects = NIL; - foreach_ptr(objName, stmt->objects) - { - Oid tsconfigOid = get_ts_config_oid(objName, stmt->missing_ok); - if (!OidIsValid(tsconfigOid)) - { - /* skip missing configuration names, they can't be distributed */ - continue; - } - - ObjectAddress address = { 0 }; - ObjectAddressSet(address, TSConfigRelationId, tsconfigOid); - if (!IsObjectDistributed(&address)) - { - continue; - } - distributedObjects = lappend(distributedObjects, objName); - } - return distributedObjects; -} - - -/* - * GetDistributedTextSearchDictionaryNames iterates over all text search dictionaries - * dropped, and create a list containing all dictionaries that are distributed. - */ -static List * -GetDistributedTextSearchDictionaryNames(DropStmt *stmt) -{ - List *objName = NULL; - List *distributedObjects = NIL; - foreach_ptr(objName, stmt->objects) - { - Oid tsdictOid = get_ts_dict_oid(objName, stmt->missing_ok); - if (!OidIsValid(tsdictOid)) - { - /* skip missing dictionary names, they can't be distributed */ - continue; - } - - ObjectAddress address = { 0 }; - ObjectAddressSet(address, TSDictionaryRelationId, tsdictOid); - if (!IsObjectDistributed(&address)) - { - continue; - } - distributedObjects = lappend(distributedObjects, objName); - } - return distributedObjects; -} - - -/* - * PreprocessAlterTextSearchConfigurationStmt verifies if the configuration being altered - * is distributed in the cluster. If that is the case it will prepare the list of commands - * to send to the worker to apply the same changes remote. - */ -List * -PreprocessAlterTextSearchConfigurationStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterTSConfigurationStmt *stmt = castNode(AlterTSConfigurationStmt, node); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSCONFIGURATION); - - QualifyTreeNode((Node *) stmt); - const char *alterStmtSql = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) alterStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterTextSearchDictionaryStmt verifies if the dictionary being altered is - * distributed in the cluster. If that is the case it will prepare the list of commands to - * send to the worker to apply the same changes remote. - */ -List * -PreprocessAlterTextSearchDictionaryStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterTSDictionaryStmt *stmt = castNode(AlterTSDictionaryStmt, node); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSDICTIONARY); - - QualifyTreeNode((Node *) stmt); - const char *alterStmtSql = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) alterStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessRenameTextSearchConfigurationStmt verifies if the configuration being altered - * is distributed in the cluster. If that is the case it will prepare the list of commands - * to send to the worker to apply the same changes remote. - */ -List * -PreprocessRenameTextSearchConfigurationStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - RenameStmt *stmt = castNode(RenameStmt, node); - Assert(stmt->renameType == OBJECT_TSCONFIGURATION); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSCONFIGURATION); - - QualifyTreeNode((Node *) stmt); - - char *ddlCommand = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) ddlCommand, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessRenameTextSearchDictionaryStmt verifies if the dictionary being altered - * is distributed in the cluster. If that is the case it will prepare the list of commands - * to send to the worker to apply the same changes remote. - */ -List * -PreprocessRenameTextSearchDictionaryStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - RenameStmt *stmt = castNode(RenameStmt, node); - Assert(stmt->renameType == OBJECT_TSDICTIONARY); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSDICTIONARY); - - QualifyTreeNode((Node *) stmt); - - char *ddlCommand = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) ddlCommand, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterTextSearchConfigurationSchemaStmt verifies if the configuration being - * altered is distributed in the cluster. If that is the case it will prepare the list of - * commands to send to the worker to apply the same changes remote. - */ -List * -PreprocessAlterTextSearchConfigurationSchemaStmt(Node *node, const char *queryString, - ProcessUtilityContext - processUtilityContext) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - Assert(stmt->objectType == OBJECT_TSCONFIGURATION); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, - stmt->missing_ok); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSCONFIGURATION); - - QualifyTreeNode((Node *) stmt); - const char *sql = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterTextSearchDictionarySchemaStmt verifies if the dictionary being - * altered is distributed in the cluster. If that is the case it will prepare the list of - * commands to send to the worker to apply the same changes remote. - */ -List * -PreprocessAlterTextSearchDictionarySchemaStmt(Node *node, const char *queryString, - ProcessUtilityContext - processUtilityContext) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - Assert(stmt->objectType == OBJECT_TSDICTIONARY); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, - stmt->missing_ok); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSDICTIONARY); - - QualifyTreeNode((Node *) stmt); - const char *sql = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessAlterTextSearchConfigurationSchemaStmt is invoked after the schema has been - * changed locally. Since changing the schema could result in new dependencies being found - * for this object we re-ensure all the dependencies for the configuration do exist. This - * is solely to propagate the new schema (and all its dependencies) if it was not already - * distributed in the cluster. - */ -List * -PostprocessAlterTextSearchConfigurationSchemaStmt(Node *node, const char *queryString) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - Assert(stmt->objectType == OBJECT_TSCONFIGURATION); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, - stmt->missing_ok); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - /* dependencies have changed (schema) let's ensure they exist */ - EnsureDependenciesExistOnAllNodes(&address); - - return NIL; -} - - -/* - * PostprocessAlterTextSearchDictionarySchemaStmt is invoked after the schema has been - * changed locally. Since changing the schema could result in new dependencies being found - * for this object we re-ensure all the dependencies for the dictionary do exist. This - * is solely to propagate the new schema (and all its dependencies) if it was not already - * distributed in the cluster. - */ -List * -PostprocessAlterTextSearchDictionarySchemaStmt(Node *node, const char *queryString) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - Assert(stmt->objectType == OBJECT_TSDICTIONARY); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, - stmt->missing_ok); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - /* dependencies have changed (schema) let's ensure they exist */ - EnsureDependenciesExistOnAllNodes(&address); - - return NIL; -} - - -/* - * PreprocessTextSearchConfigurationCommentStmt propagates any comment on a distributed - * configuration to the workers. Since comments for configurations are promenently shown - * when listing all text search configurations this is purely a cosmetic thing when - * running in MX. - */ -List * -PreprocessTextSearchConfigurationCommentStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - CommentStmt *stmt = castNode(CommentStmt, node); - Assert(stmt->objtype == OBJECT_TSCONFIGURATION); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSCONFIGURATION); - - QualifyTreeNode((Node *) stmt); - const char *sql = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessTextSearchDictionaryCommentStmt propagates any comment on a distributed - * dictionary to the workers. Since comments for dictionaries are promenently shown - * when listing all text search dictionaries this is purely a cosmetic thing when - * running in MX. - */ -List * -PreprocessTextSearchDictionaryCommentStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - CommentStmt *stmt = castNode(CommentStmt, node); - Assert(stmt->objtype == OBJECT_TSDICTIONARY); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSDICTIONARY); - - QualifyTreeNode((Node *) stmt); - const char *sql = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterTextSearchConfigurationOwnerStmt verifies if the configuration being - * altered is distributed in the cluster. If that is the case it will prepare the list of - * commands to send to the worker to apply the same changes remote. - */ -List * -PreprocessAlterTextSearchConfigurationOwnerStmt(Node *node, const char *queryString, - ProcessUtilityContext - processUtilityContext) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - Assert(stmt->objectType == OBJECT_TSCONFIGURATION); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSCONFIGURATION); - - QualifyTreeNode((Node *) stmt); - char *sql = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterTextSearchDictionaryOwnerStmt verifies if the dictionary being - * altered is distributed in the cluster. If that is the case it will prepare the list of - * commands to send to the worker to apply the same changes remote. - */ -List * -PreprocessAlterTextSearchDictionaryOwnerStmt(Node *node, const char *queryString, - ProcessUtilityContext - processUtilityContext) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - Assert(stmt->objectType == OBJECT_TSDICTIONARY); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - EnsureCoordinator(); - EnsureSequentialMode(OBJECT_TSDICTIONARY); - - QualifyTreeNode((Node *) stmt); - char *sql = DeparseTreeNode((Node *) stmt); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessAlterTextSearchConfigurationOwnerStmt is invoked after the owner has been - * changed locally. Since changing the owner could result in new dependencies being found - * for this object we re-ensure all the dependencies for the configuration do exist. This - * is solely to propagate the new owner (and all its dependencies) if it was not already - * distributed in the cluster. - */ -List * -PostprocessAlterTextSearchConfigurationOwnerStmt(Node *node, const char *queryString) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - Assert(stmt->objectType == OBJECT_TSCONFIGURATION); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - /* dependencies have changed (owner) let's ensure they exist */ - EnsureDependenciesExistOnAllNodes(&address); - - return NIL; -} - - -/* - * PostprocessAlterTextSearchDictionaryOwnerStmt is invoked after the owner has been - * changed locally. Since changing the owner could result in new dependencies being found - * for this object we re-ensure all the dependencies for the dictionary do exist. This - * is solely to propagate the new owner (and all its dependencies) if it was not already - * distributed in the cluster. - */ -List * -PostprocessAlterTextSearchDictionaryOwnerStmt(Node *node, const char *queryString) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - Assert(stmt->objectType == OBJECT_TSDICTIONARY); - - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&address)) - { - return NIL; - } - - /* dependencies have changed (owner) let's ensure they exist */ - EnsureDependenciesExistOnAllNodes(&address); - - return NIL; -} - - /* * GetTextSearchConfigDefineStmt returns the DefineStmt for a TEXT SEARCH CONFIGURATION * based on the configuration as defined in the catalog identified by tsconfigOid. diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 4973aafd0..69978a4a9 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -90,8 +90,6 @@ bool EnableCreateTypePropagation = true; /* forward declaration for helper functions*/ -static List * FilterNameListForDistributedTypes(List *objects, bool missing_ok); -static List * TypeNameListToObjectAddresses(List *objects); static TypeName * MakeTypeNameFromRangeVar(const RangeVar *relation); static Oid GetTypeOwner(Oid typeOid); static Oid LookupNonAssociatedArrayTypeNameOid(ParseState *pstate, @@ -104,365 +102,6 @@ static List * CompositeTypeColumnDefList(Oid typeOid); static CreateEnumStmt * RecreateEnumStmt(Oid typeOid); static List * EnumValsList(Oid typeOid); -static bool ShouldPropagateTypeCreate(void); - - -/* - * PreprocessCompositeTypeStmt is called during the creation of a composite type. It is executed - * before the statement is applied locally. - * - * We decide if the compisite type needs to be replicated to the worker, and if that is - * the case return a list of DDLJob's that describe how and where the type needs to be - * created. - * - * Since the planning happens before the statement has been applied locally we do not have - * access to the ObjectAddress of the new type. - */ -List * -PreprocessCompositeTypeStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - if (!ShouldPropagateTypeCreate()) - { - return NIL; - } - - /* - * managing types can only be done on the coordinator if ddl propagation is on. when - * it is off we will never get here - */ - EnsureCoordinator(); - - /* fully qualify before lookup and later deparsing */ - QualifyTreeNode(node); - - return NIL; -} - - -/* - * PostprocessCompositeTypeStmt is executed after the type has been created locally and before - * we create it on the remote servers. Here we have access to the ObjectAddress of the new - * type which we use to make sure the type's dependencies are on all nodes. - */ -List * -PostprocessCompositeTypeStmt(Node *node, const char *queryString) -{ - /* same check we perform during planning of the statement */ - if (!ShouldPropagateTypeCreate()) - { - return NIL; - } - - /* - * find object address of the just created object, because the type has been created - * locally it can't be missing - */ - ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false); - - /* If the type has any unsupported dependency, create it locally */ - DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&typeAddress); - if (errMsg != NULL) - { - RaiseDeferredError(errMsg, WARNING); - return NIL; - } - - /* - * when we allow propagation within a transaction block we should make sure to only - * allow this in sequential mode - */ - EnsureSequentialMode(OBJECT_TYPE); - - EnsureDependenciesExistOnAllNodes(&typeAddress); - - /* - * reconstruct creation statement in a portable fashion. The create_or_replace helper - * function will be used to create the type in an idempotent manner on the workers. - * - * Types could exist on the worker prior to being created on the coordinator when the - * type previously has been attempted to be created in a transaction which did not - * commit on the coordinator. - */ - const char *compositeTypeStmtSql = DeparseCompositeTypeStmt(node); - compositeTypeStmtSql = WrapCreateOrReplace(compositeTypeStmtSql); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) compositeTypeStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterTypeStmt is invoked for alter type statements for composite types. - * - * Normally we would have a process step as well to re-ensure dependencies exists, however - * this is already implemented by the post processing for adding columns to tables. - */ -List * -PreprocessAlterTypeStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterTableStmt *stmt = castNode(AlterTableStmt, node); - Assert(AlterTableStmtObjType_compat(stmt) == OBJECT_TYPE); - - ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&typeAddress)) - { - return NIL; - } - - EnsureCoordinator(); - - /* reconstruct alter statement in a portable fashion */ - QualifyTreeNode((Node *) stmt); - const char *alterTypeStmtSql = DeparseTreeNode((Node *) stmt); - - /* - * all types that are distributed will need their alter statements propagated - * regardless if in a transaction or not. If we would not propagate the alter - * statement the types would be different on worker and coordinator. - */ - EnsureSequentialMode(OBJECT_TYPE); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) alterTypeStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessCreateEnumStmt is called before the statement gets applied locally. - * - * It decides if the create statement will be applied to the workers and if that is the - * case returns a list of DDLJobs that will be executed _after_ the statement has been - * applied locally. - * - * Since planning is done before we have created the object locally we do not have an - * ObjectAddress for the new type just yet. - */ -List * -PreprocessCreateEnumStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - if (!ShouldPropagateTypeCreate()) - { - return NIL; - } - - /* managing types can only be done on the coordinator */ - EnsureCoordinator(); - - /* enforce fully qualified typeName for correct deparsing and lookup */ - QualifyTreeNode(node); - - return NIL; -} - - -/* - * PostprocessCreateEnumStmt is called after the statement has been applied locally, but - * before the plan on how to create the types on the workers has been executed. - * - * We apply the same checks to verify if the type should be distributed, if that is the - * case we resolve the ObjectAddress for the just created object, distribute its - * dependencies to all the nodes, and mark the object as distributed. - */ -List * -PostprocessCreateEnumStmt(Node *node, const char *queryString) -{ - if (!ShouldPropagateTypeCreate()) - { - return NIL; - } - - /* lookup type address of just created type */ - ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false); - - DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&typeAddress); - if (errMsg != NULL) - { - RaiseDeferredError(errMsg, WARNING); - return NIL; - } - - /* - * when we allow propagation within a transaction block we should make sure to only - * allow this in sequential mode - */ - EnsureSequentialMode(OBJECT_TYPE); - - EnsureDependenciesExistOnAllNodes(&typeAddress); - - /* reconstruct creation statement in a portable fashion */ - const char *createEnumStmtSql = DeparseCreateEnumStmt(node); - createEnumStmtSql = WrapCreateOrReplace(createEnumStmtSql); - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) createEnumStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessAlterEnumStmt handles ALTER TYPE ... ADD VALUE for enum based types. Planning - * happens before the statement has been applied locally. - * - * Since it is an alter of an existing type we actually have the ObjectAddress. This is - * used to check if the type is distributed, if so the alter will be executed on the - * workers directly to keep the types in sync across the cluster. - */ -List * -PreprocessAlterEnumStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false); - if (!ShouldPropagateObject(&typeAddress)) - { - return NIL; - } - - /* - * alter enum will run for all distributed enums, regardless if in a transaction or - * not since the enum will be different on the coordinator and workers if we didn't. - * (adding values to an enum can not run in a transaction anyway and would error by - * postgres already). - */ - EnsureSequentialMode(OBJECT_TYPE); - - /* - * managing types can only be done on the coordinator if ddl propagation is on. when - * it is off we will never get here - */ - EnsureCoordinator(); - - QualifyTreeNode(node); - const char *alterEnumStmtSql = DeparseTreeNode(node); - - /* - * Before pg12 ALTER ENUM ... ADD VALUE could not be within a xact block. Instead of - * creating a DDLTaksList we won't return anything here. During the processing phase - * we directly connect to workers and execute the commands remotely. - */ - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) alterEnumStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessDropTypeStmt is called for all DROP TYPE statements. For all types in the list that - * citus has distributed to the workers it will drop the type on the workers as well. If - * no types in the drop list are distributed no calls will be made to the workers. - */ -List * -PreprocessDropTypeStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - DropStmt *stmt = castNode(DropStmt, node); - - /* - * We swap the list of objects to remove during deparse so we need a reference back to - * the old list to put back - */ - List *oldTypes = stmt->objects; - - if (!ShouldPropagate()) - { - return NIL; - } - - List *distributedTypes = FilterNameListForDistributedTypes(oldTypes, - stmt->missing_ok); - if (list_length(distributedTypes) <= 0) - { - /* no distributed types to drop */ - return NIL; - } - - /* - * managing types can only be done on the coordinator if ddl propagation is on. when - * it is off we will never get here. MX workers don't have a notion of distributed - * types, so we block the call. - */ - EnsureCoordinator(); - - /* - * remove the entries for the distributed objects on dropping - */ - List *distributedTypeAddresses = TypeNameListToObjectAddresses(distributedTypes); - ObjectAddress *address = NULL; - foreach_ptr(address, distributedTypeAddresses) - { - UnmarkObjectDistributed(address); - } - - /* - * temporary swap the lists of objects to delete with the distributed objects and - * deparse to an executable sql statement for the workers - */ - stmt->objects = distributedTypes; - char *dropStmtSql = DeparseTreeNode((Node *) stmt); - stmt->objects = oldTypes; - - EnsureSequentialMode(OBJECT_TYPE); - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - dropStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PreprocessRenameTypeStmt is called when the user is renaming the type. The invocation happens - * before the statement is applied locally. - * - * As the type already exists we have access to the ObjectAddress for the type, this is - * used to check if the type is distributed. If the type is distributed the rename is - * executed on all the workers to keep the types in sync across the cluster. - */ -List * -PreprocessRenameTypeStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false); - if (!ShouldPropagateObject(&typeAddress)) - { - return NIL; - } - - EnsureCoordinator(); - - /* fully qualify */ - QualifyTreeNode(node); - - /* deparse sql*/ - const char *renameStmtSql = DeparseTreeNode(node); - - EnsureSequentialMode(OBJECT_TYPE); - - /* to prevent recursion with mx we disable ddl propagation */ - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) renameStmtSql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - /* * PreprocessRenameTypeAttributeStmt is called for changes of attribute names for composite * types. Planning is called before the statement is applied locally. @@ -499,98 +138,6 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString, } -/* - * PreprocessAlterTypeSchemaStmt is executed before the statement is applied to the local - * postgres instance. - * - * In this stage we can prepare the commands that need to be run on all workers. - */ -List * -PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - Assert(stmt->objectType == OBJECT_TYPE); - - ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&typeAddress)) - { - return NIL; - } - - EnsureCoordinator(); - - QualifyTreeNode((Node *) stmt); - const char *sql = DeparseTreeNode((Node *) stmt); - - EnsureSequentialMode(OBJECT_TYPE); - - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - -/* - * PostprocessAlterTypeSchemaStmt is executed after the change has been applied locally, we - * can now use the new dependencies of the type to ensure all its dependencies exist on - * the workers before we apply the commands remotely. - */ -List * -PostprocessAlterTypeSchemaStmt(Node *node, const char *queryString) -{ - AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); - Assert(stmt->objectType == OBJECT_TYPE); - - ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&typeAddress)) - { - return NIL; - } - - /* dependencies have changed (schema) let's ensure they exist */ - EnsureDependenciesExistOnAllNodes(&typeAddress); - - return NIL; -} - - -/* - * PreprocessAlterTypeOwnerStmt is called for change of ownership of types before the - * ownership is changed on the local instance. - * - * If the type for which the owner is changed is distributed we execute the change on all - * the workers to keep the type in sync across the cluster. - */ -List * -PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext) -{ - AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); - Assert(stmt->objectType == OBJECT_TYPE); - - ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false); - if (!ShouldPropagateObject(&typeAddress)) - { - return NIL; - } - - EnsureCoordinator(); - - QualifyTreeNode((Node *) stmt); - const char *sql = DeparseTreeNode((Node *) stmt); - - EnsureSequentialMode(OBJECT_TYPE); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, - ENABLE_DDL_PROPAGATION); - - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); -} - - /* * CreateTypeStmtByObjectAddress returns a parsetree for the CREATE TYPE statement to * recreate the type by its object address. @@ -1051,60 +598,6 @@ GenerateBackupNameForTypeCollision(const ObjectAddress *address) } -/* - * FilterNameListForDistributedTypes takes a list of objects to delete, for Types this - * will be a list of TypeName. This list is filtered against the types that are - * distributed. - * - * The original list will not be touched, a new list will be created with only the objects - * in there. - */ -static List * -FilterNameListForDistributedTypes(List *objects, bool missing_ok) -{ - List *result = NIL; - TypeName *typeName = NULL; - foreach_ptr(typeName, objects) - { - Oid typeOid = LookupTypeNameOid(NULL, typeName, missing_ok); - ObjectAddress typeAddress = { 0 }; - - if (!OidIsValid(typeOid)) - { - continue; - } - - ObjectAddressSet(typeAddress, TypeRelationId, typeOid); - if (IsObjectDistributed(&typeAddress)) - { - result = lappend(result, typeName); - } - } - return result; -} - - -/* - * TypeNameListToObjectAddresses transforms a List * of TypeName *'s into a List * of - * ObjectAddress *'s. For this to succeed all Types identified by the TypeName *'s should - * exist on this postgres, an error will be thrown otherwise. - */ -static List * -TypeNameListToObjectAddresses(List *objects) -{ - List *result = NIL; - TypeName *typeName = NULL; - foreach_ptr(typeName, objects) - { - Oid typeOid = LookupTypeNameOid(NULL, typeName, false); - ObjectAddress *typeAddress = palloc0(sizeof(ObjectAddress)); - ObjectAddressSet(*typeAddress, TypeRelationId, typeOid); - result = lappend(result, typeAddress); - } - return result; -} - - /* * GetTypeOwner * @@ -1145,47 +638,6 @@ MakeTypeNameFromRangeVar(const RangeVar *relation) } -/* - * ShouldPropagateTypeCreate returns if we should propagate the creation of a type. - * - * There are two moments we decide to not directly propagate the creation of a type. - * - During the creation of an Extension; we assume the type will be created by creating - * the extension on the worker - * - During a transaction block; if types are used in a distributed table in the same - * block we can only provide parallelism on the table if we do not change to sequential - * mode. Types will be propagated outside of this transaction to the workers so that - * the transaction can use 1 connection per shard and fully utilize citus' parallelism - */ -static bool -ShouldPropagateTypeCreate() -{ - if (!ShouldPropagate()) - { - return false; - } - - if (!EnableCreateTypePropagation) - { - /* - * Administrator has turned of type creation propagation - */ - return false; - } - - /* - * by not propagating in a transaction block we allow for parallelism to be used when - * this type will be used as a column in a table that will be created and distributed - * in this same transaction. - */ - if (!ShouldPropagateCreateInCoordinatedTransction()) - { - return false; - } - - return true; -} - - /* * LookupNonAssociatedArrayTypeNameOid returns the oid of the type with the given type name * that is not an array type that is associated to another user defined type. diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index c45765bac..3f71e867a 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -525,6 +525,20 @@ ProcessUtilityInternal(PlannedStmt *pstmt, parsetree = pstmt->utilityStmt; ops = GetDistributeObjectOps(parsetree); + /* + * For some statements Citus defines a Qualify function. The goal of this function + * is to take any ambiguity from the statement that is contextual on either the + * search_path or current settings. + * Instead of relying on the search_path and settings we replace any deduced bits + * and fill them out how postgres would resolve them. This makes subsequent + * deserialize calls for the statement portable to other postgres servers, the + * workers in our case. + */ + if (ops && ops->qualify) + { + ops->qualify(parsetree); + } + if (ops && ops->preprocess) { ddlJobs = ops->preprocess(parsetree, queryString, context); diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index a1e1719de..e5125aba0 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -63,6 +63,15 @@ typedef struct DistributeObjectOps List * (*postprocess)(Node *, const char *); ObjectAddress (*address)(Node *, bool); bool markDistributed; + + /* fields used by common implementations, omitted for specialized implementations */ + ObjectType objectType; + + /* + * Points to the varriable that contains the GUC'd feature flag, when turned off the + * common propagation functions will not propagate the creation of the object. + */ + bool *featureFlag; } DistributeObjectOps; #define CITUS_TRUNCATE_TRIGGER_NAME "citus_truncate_trigger" @@ -122,15 +131,21 @@ typedef enum SearchForeignKeyColumnFlags } SearchForeignKeyColumnFlags; -/* aggregate.c - forward declarations */ -extern List * PreprocessDefineAggregateStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PostprocessDefineAggregateStmt(Node *node, const char *queryString); - /* cluster.c - forward declarations */ extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand, ProcessUtilityContext processUtilityContext); +/* common.c - forward declarations*/ +extern List * PostprocessCreateDistributedObjectFromCatalogStmt(Node *stmt, + const char *queryString); +extern List * PreprocessAlterDistributedObjectStmt(Node *stmt, const char *queryString, + ProcessUtilityContext + processUtilityContext); +extern List * PostprocessAlterDistributedObjectStmt(Node *stmt, const char *queryString); +extern List * PreprocessDropDistributedObjectStmt(Node *node, const char *queryString, + ProcessUtilityContext + processUtilityContext); + /* index.c */ typedef void (*PGIndexProcessor)(Form_pg_index, List **, int); @@ -143,54 +158,17 @@ extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *d extern char * CreateCollationDDL(Oid collationId); extern List * CreateCollationDDLsIdempotent(Oid collationId); extern ObjectAddress AlterCollationOwnerObjectAddress(Node *stmt, bool missing_ok); -extern List * PreprocessDropCollationStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PreprocessAlterCollationOwnerStmt(Node *stmt, const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PostprocessAlterCollationOwnerStmt(Node *node, const char *queryString); -extern List * PreprocessAlterCollationSchemaStmt(Node *stmt, const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessRenameCollationStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); extern ObjectAddress RenameCollationStmtObjectAddress(Node *stmt, bool missing_ok); extern ObjectAddress AlterCollationSchemaStmtObjectAddress(Node *stmt, bool missing_ok); -extern List * PostprocessAlterCollationSchemaStmt(Node *stmt, const char *queryString); extern char * GenerateBackupNameForCollationCollision(const ObjectAddress *address); extern ObjectAddress DefineCollationStmtObjectAddress(Node *stmt, bool missing_ok); -extern List * PreprocessDefineCollationStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PostprocessDefineCollationStmt(Node *stmt, const char *queryString); /* database.c - forward declarations */ -extern List * PreprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PostprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString); extern ObjectAddress AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok); extern List * DatabaseOwnerDDLCommands(const ObjectAddress *address); /* domain.c - forward declarations */ -extern List * PreprocessCreateDomainStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PostprocessCreateDomainStmt(Node *node, const char *queryString); -extern List * PreprocessDropDomainStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PreprocessAlterDomainStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PostprocessAlterDomainStmt(Node *node, const char *queryString); -extern List * PreprocessDomainRenameConstraintStmt(Node *node, const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessAlterDomainOwnerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PostprocessAlterDomainOwnerStmt(Node *node, const char *queryString); -extern List * PreprocessRenameDomainStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PreprocessAlterDomainSchemaStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PostprocessAlterDomainSchemaStmt(Node *node, const char *queryString); extern ObjectAddress CreateDomainStmtObjectAddress(Node *node, bool missing_ok); extern ObjectAddress AlterDomainStmtObjectAddress(Node *node, bool missing_ok); extern ObjectAddress DomainRenameConstraintStmtObjectAddress(Node *node, @@ -266,23 +244,9 @@ extern Oid GetReferencingTableId(Oid foreignKeyId); extern bool RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId); /* foreign_server.c - forward declarations */ -extern List * PreprocessCreateForeignServerStmt(Node *node, const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessAlterForeignServerStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PreprocessRenameForeignServerStmt(Node *node, const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessDropForeignServerStmt(Node *node, const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PostprocessCreateForeignServerStmt(Node *node, const char *queryString); -extern List * PostprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString); extern ObjectAddress CreateForeignServerStmtObjectAddress(Node *node, bool missing_ok); +extern ObjectAddress AlterForeignServerStmtObjectAddress(Node *node, bool missing_ok); +extern ObjectAddress RenameForeignServerStmtObjectAddress(Node *node, bool missing_ok); extern ObjectAddress AlterForeignServerOwnerStmtObjectAddress(Node *node, bool missing_ok); extern List * GetForeignServerCreateDDLCommand(Oid serverId); @@ -307,24 +271,12 @@ extern List * PreprocessAlterFunctionStmt(Node *stmt, const char *queryString, ProcessUtilityContext processUtilityContext); extern ObjectAddress AlterFunctionStmtObjectAddress(Node *stmt, bool missing_ok); -extern List * PreprocessRenameFunctionStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); extern ObjectAddress RenameFunctionStmtObjectAddress(Node *stmt, bool missing_ok); -extern List * PreprocessAlterFunctionOwnerStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PostprocessAlterFunctionOwnerStmt(Node *stmt, const char *queryString); extern ObjectAddress AlterFunctionOwnerObjectAddress(Node *stmt, bool missing_ok); -extern List * PreprocessAlterFunctionSchemaStmt(Node *stmt, const char *queryString, - ProcessUtilityContext - processUtilityContext); extern ObjectAddress AlterFunctionSchemaStmtObjectAddress(Node *stmt, bool missing_ok); -extern List * PostprocessAlterFunctionSchemaStmt(Node *stmt, - const char *queryString); -extern List * PreprocessDropFunctionStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); extern List * PreprocessAlterFunctionDependsStmt(Node *stmt, const char *queryString, ProcessUtilityContext @@ -416,8 +368,6 @@ extern List * PreprocessAlterObjectSchemaStmt(Node *alterObjectSchemaStmt, const char *alterObjectSchemaCommand); extern List * PreprocessGrantOnSchemaStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); -extern List * PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString, - ProcessUtilityContext processUtilityContext); extern ObjectAddress CreateSchemaStmtObjectAddress(Node *node, bool missing_ok); extern ObjectAddress AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok); @@ -508,70 +458,10 @@ extern bool ConstrTypeUsesIndex(ConstrType constrType); /* text_search.c - forward declarations */ -extern List * PostprocessCreateTextSearchConfigurationStmt(Node *node, - const char *queryString); -extern List * PostprocessCreateTextSearchDictionaryStmt(Node *node, - const char *queryString); extern List * GetCreateTextSearchConfigStatements(const ObjectAddress *address); extern List * GetCreateTextSearchDictionaryStatements(const ObjectAddress *address); extern List * CreateTextSearchConfigDDLCommandsIdempotent(const ObjectAddress *address); extern List * CreateTextSearchDictDDLCommandsIdempotent(const ObjectAddress *address); -extern List * PreprocessDropTextSearchConfigurationStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessDropTextSearchDictionaryStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessAlterTextSearchConfigurationStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessAlterTextSearchDictionaryStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessRenameTextSearchConfigurationStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessRenameTextSearchDictionaryStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessAlterTextSearchConfigurationSchemaStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessAlterTextSearchDictionarySchemaStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PostprocessAlterTextSearchConfigurationSchemaStmt(Node *node, - const char *queryString); -extern List * PostprocessAlterTextSearchDictionarySchemaStmt(Node *node, - const char *queryString); -extern List * PreprocessTextSearchConfigurationCommentStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessTextSearchDictionaryCommentStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessAlterTextSearchConfigurationOwnerStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PreprocessAlterTextSearchDictionaryOwnerStmt(Node *node, - const char *queryString, - ProcessUtilityContext - processUtilityContext); -extern List * PostprocessAlterTextSearchConfigurationOwnerStmt(Node *node, - const char *queryString); -extern List * PostprocessAlterTextSearchDictionaryOwnerStmt(Node *node, - const char *queryString); extern ObjectAddress CreateTextSearchConfigurationObjectAddress(Node *node, bool missing_ok); extern ObjectAddress CreateTextSearchDictObjectAddress(Node *node, @@ -604,28 +494,9 @@ extern List * get_ts_config_namelist(Oid tsconfigOid); extern void PreprocessTruncateStatement(TruncateStmt *truncateStatement); /* type.c - forward declarations */ -extern List * PreprocessCompositeTypeStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PostprocessCompositeTypeStmt(Node *stmt, const char *queryString); -extern List * PreprocessAlterTypeStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PreprocessCreateEnumStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PostprocessCreateEnumStmt(Node *stmt, const char *queryString); -extern List * PreprocessAlterEnumStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PreprocessDropTypeStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PreprocessRenameTypeStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); extern List * PreprocessRenameTypeAttributeStmt(Node *stmt, const char *queryString, ProcessUtilityContext processUtilityContext); -extern List * PreprocessAlterTypeSchemaStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PreprocessAlterTypeOwnerStmt(Node *stmt, const char *queryString, - ProcessUtilityContext processUtilityContext); -extern List * PostprocessAlterTypeSchemaStmt(Node *stmt, const char *queryString); extern Node * CreateTypeStmtByObjectAddress(const ObjectAddress *address); extern ObjectAddress CompositeTypeStmtObjectAddress(Node *stmt, bool missing_ok); extern ObjectAddress CreateEnumStmtObjectAddress(Node *stmt, bool missing_ok); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 0d9f125d8..5eab34cd8 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -250,6 +250,7 @@ extern TableConversionReturn * UndistributeTable(TableConversionParameters *para extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); +extern List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency); extern bool ShouldPropagate(void); extern bool ShouldPropagateCreateInCoordinatedTransction(void); extern bool ShouldPropagateObject(const ObjectAddress *address); diff --git a/src/test/regress/expected/propagate_foreign_servers.out b/src/test/regress/expected/propagate_foreign_servers.out index c0dbfcdb9..551d1dde7 100644 --- a/src/test/regress/expected/propagate_foreign_servers.out +++ b/src/test/regress/expected/propagate_foreign_servers.out @@ -92,11 +92,8 @@ SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c CREATE SERVER foreign_server_to_drop FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'test'); ---should error -DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop; -ERROR: cannot drop distributed server with other servers -HINT: Try dropping each object in a separate DROP command DROP FOREIGN TABLE foreign_table; +DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop; SELECT citus_remove_node('localhost', :master_port); citus_remove_node --------------------------------------------------------------------- diff --git a/src/test/regress/sql/propagate_foreign_servers.sql b/src/test/regress/sql/propagate_foreign_servers.sql index eea09b9ab..32cba12ef 100644 --- a/src/test/regress/sql/propagate_foreign_servers.sql +++ b/src/test/regress/sql/propagate_foreign_servers.sql @@ -55,9 +55,8 @@ CREATE SERVER foreign_server_to_drop FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'test'); ---should error -DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop; DROP FOREIGN TABLE foreign_table; +DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop; SELECT citus_remove_node('localhost', :master_port); SET client_min_messages TO ERROR;