From 514c6a76ac75d25dbf47b2250d50dc5abdb14c54 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Tue, 1 Dec 2020 18:46:44 +0300 Subject: [PATCH] Propagate alter schema rename --- .../commands/distribute_object_ops.c | 12 +++ src/backend/distributed/commands/schema.c | 93 +++++++++++++++++++ .../deparser/deparse_schema_stmts.c | 24 +++++ src/include/distributed/commands.h | 3 +- src/include/distributed/deparser.h | 1 + 5 files changed, 132 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 40f72f650..a856003d6 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -400,6 +400,13 @@ static DistributeObjectOps Schema_Grant = { .postprocess = NULL, .address = NULL, }; +static DistributeObjectOps Schema_Rename = { + .deparse = DeparseAlterSchemaRenameStmt, + .qualify = NULL, + .preprocess = PreprocessAlterSchemaRenameStmt, + .postprocess = NULL, + .address = AlterSchemaRenameStmtObjectAddress, +}; static DistributeObjectOps Table_AlterTable = { .deparse = NULL, .qualify = NULL, @@ -871,6 +878,11 @@ GetDistributeObjectOps(Node *node) return &Routine_Rename; } + case OBJECT_SCHEMA: + { + return &Schema_Rename; + } + case OBJECT_TYPE: { return &Type_Rename; diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index fdce4fff0..1c4e1286e 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -15,6 +15,7 @@ #include "access/heapam.h" #include "access/htup.h" #include "access/htup_details.h" +#include "access/xact.h" #include "catalog/namespace.h" #include "catalog/pg_class.h" #include "catalog/pg_namespace.h" @@ -26,7 +27,9 @@ #include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" #include +#include "distributed/multi_executor.h" #include "distributed/reference_table_utils.h" +#include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include #include @@ -38,6 +41,7 @@ static List * FilterDistributedSchemas(List *schemas); +static void EnsureSequentialModeForSchemaDDL(void); /* @@ -153,6 +157,57 @@ PreprocessGrantOnSchemaStmt(Node *node, const char *queryString) } +/* + * PreprocessAlterSchemaRenameStmt is called when the user is renaming a schema. + * The invocation happens before the statement is applied locally. + * + * As the schema already exists we have access to the ObjectAddress for the schema, this + * is used to check if the schmea is distributed. If the schema is distributed the rename + * is executed on all the workers to keep the schemas in sync across the cluster. + */ +List * +PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString) +{ + ObjectAddress schemaAddress = GetObjectAddressFromParseTree(node, false); + if (!ShouldPropagateObject(&schemaAddress)) + { + return NIL; + } + + /* deparse sql*/ + const char *renameStmtSql = DeparseTreeNode(node); + + EnsureSequentialModeForSchemaDDL(); + + /* to prevent recursion with mx we disable ddl propagation */ + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) renameStmtSql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} + + +/* + * AlterSchemaRenameStmtObjectAddress returns the ObjectAddress of the schema that is + * the object of the RenameStmt. Errors if missing_ok is false. + */ +ObjectAddress +AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok) +{ + RenameStmt *stmt = castNode(RenameStmt, node); + Assert(stmt->renameType == OBJECT_SCHEMA); + + const char *schemaName = stmt->subname; + Oid schemaOid = get_namespace_oid(schemaName, missing_ok); + + ObjectAddress address = { 0 }; + ObjectAddressSet(address, NamespaceRelationId, schemaOid); + + return address; +} + + /* * FilterDistributedSchemas filters the schema list and returns the distributed ones * as a list @@ -186,3 +241,41 @@ FilterDistributedSchemas(List *schemas) return distributedSchemas; } + + +/* + * EnsureSequentialModeForSchemaDDL makes sure that the current transaction is already in + * sequential mode, or can still safely be put in sequential mode, it errors if that is + * not possible. The error contains information for the user to retry the transaction with + * sequential mode set from the begining. + * + * Copy-pasted from type.c + */ +static void +EnsureSequentialModeForSchemaDDL(void) +{ + if (!IsTransactionBlock()) + { + /* we do not need to switch to sequential mode if we are not in a transaction */ + return; + } + + if (ParallelQueryExecutedInTransaction()) + { + ereport(ERROR, (errmsg("cannot create or modify type because there was a " + "parallel operation on a distributed table in the " + "transaction"), + errdetail("When creating or altering a schema, Citus needs to " + "perform all operations over a single connection per " + "node to ensure consistency."), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.multi_shard_modify_mode TO " + "\'sequential\';\""))); + } + + ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), + errdetail("Schema is created or altered. To make sure subsequent " + "commands see the schema correctly we need to make sure to " + "use only one connection for all future commands"))); + SetLocalMultiShardModifyModeToSequential(); +} diff --git a/src/backend/distributed/deparser/deparse_schema_stmts.c b/src/backend/distributed/deparser/deparse_schema_stmts.c index 2aa4a6439..35321f2e6 100644 --- a/src/backend/distributed/deparser/deparse_schema_stmts.c +++ b/src/backend/distributed/deparser/deparse_schema_stmts.c @@ -21,6 +21,7 @@ static void AppendGrantOnSchemaStmt(StringInfo buf, GrantStmt *stmt); static void AppendGrantOnSchemaPrivileges(StringInfo buf, GrantStmt *stmt); static void AppendGrantOnSchemaSchemas(StringInfo buf, GrantStmt *stmt); static void AppendGrantOnSchemaGrantees(StringInfo buf, GrantStmt *stmt); +static void AppendAlterSchemaRenameStmt(StringInfo buf, RenameStmt *stmt); char * DeparseGrantOnSchemaStmt(Node *node) @@ -37,6 +38,20 @@ DeparseGrantOnSchemaStmt(Node *node) } +char * +DeparseAlterSchemaRenameStmt(Node *node) +{ + RenameStmt *stmt = castNode(RenameStmt, node); + + StringInfoData str = { 0 }; + initStringInfo(&str); + + AppendAlterSchemaRenameStmt(&str, stmt); + + return str.data; +} + + static void AppendGrantOnSchemaStmt(StringInfo buf, GrantStmt *stmt) { @@ -131,3 +146,12 @@ AppendGrantOnSchemaGrantees(StringInfo buf, GrantStmt *stmt) } } } + + +static void +AppendAlterSchemaRenameStmt(StringInfo buf, RenameStmt *stmt) +{ + Assert(stmt->renameType == OBJECT_SCHEMA); + + appendStringInfo(buf, "ALTER SCHEMA %s RENAME TO %s;", stmt->subname, stmt->newname); +} diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 292521641..3bd7b4967 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -227,7 +227,8 @@ extern List * PreprocessDropSchemaStmt(Node *dropSchemaStatement, extern List * PreprocessAlterObjectSchemaStmt(Node *alterObjectSchemaStmt, const char *alterObjectSchemaCommand); extern List * PreprocessGrantOnSchemaStmt(Node *node, const char *queryString); - +extern List * PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString); +extern ObjectAddress AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok); /* sequence.c - forward declarations */ extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index 62d8dd707..475ff4515 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -53,6 +53,7 @@ extern void QualifyAlterTableSchemaStmt(Node *stmt); /* forward declarations for deparse_schema_stmts.c */ extern char * DeparseGrantOnSchemaStmt(Node *stmt); +extern char * DeparseAlterSchemaRenameStmt(Node *stmt); /* forward declarations for deparse_type_stmts.c */ extern char * DeparseCompositeTypeStmt(Node *stmt);