From 514c6a76ac75d25dbf47b2250d50dc5abdb14c54 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Tue, 1 Dec 2020 18:46:44 +0300 Subject: [PATCH 1/2] Propagate alter schema rename --- .../commands/distribute_object_ops.c | 12 +++ src/backend/distributed/commands/schema.c | 93 +++++++++++++++++++ .../deparser/deparse_schema_stmts.c | 24 +++++ src/include/distributed/commands.h | 3 +- src/include/distributed/deparser.h | 1 + 5 files changed, 132 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 40f72f650..a856003d6 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -400,6 +400,13 @@ static DistributeObjectOps Schema_Grant = { .postprocess = NULL, .address = NULL, }; +static DistributeObjectOps Schema_Rename = { + .deparse = DeparseAlterSchemaRenameStmt, + .qualify = NULL, + .preprocess = PreprocessAlterSchemaRenameStmt, + .postprocess = NULL, + .address = AlterSchemaRenameStmtObjectAddress, +}; static DistributeObjectOps Table_AlterTable = { .deparse = NULL, .qualify = NULL, @@ -871,6 +878,11 @@ GetDistributeObjectOps(Node *node) return &Routine_Rename; } + case OBJECT_SCHEMA: + { + return &Schema_Rename; + } + case OBJECT_TYPE: { return &Type_Rename; diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index fdce4fff0..1c4e1286e 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -15,6 +15,7 @@ #include "access/heapam.h" #include "access/htup.h" #include "access/htup_details.h" +#include "access/xact.h" #include "catalog/namespace.h" #include "catalog/pg_class.h" #include "catalog/pg_namespace.h" @@ -26,7 +27,9 @@ #include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" #include +#include "distributed/multi_executor.h" #include "distributed/reference_table_utils.h" +#include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include #include @@ -38,6 +41,7 @@ static List * FilterDistributedSchemas(List *schemas); +static void EnsureSequentialModeForSchemaDDL(void); /* @@ -153,6 +157,57 @@ PreprocessGrantOnSchemaStmt(Node *node, const char *queryString) } +/* + * PreprocessAlterSchemaRenameStmt is called when the user is renaming a schema. + * The invocation happens before the statement is applied locally. + * + * As the schema already exists we have access to the ObjectAddress for the schema, this + * is used to check if the schmea is distributed. If the schema is distributed the rename + * is executed on all the workers to keep the schemas in sync across the cluster. + */ +List * +PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString) +{ + ObjectAddress schemaAddress = GetObjectAddressFromParseTree(node, false); + if (!ShouldPropagateObject(&schemaAddress)) + { + return NIL; + } + + /* deparse sql*/ + const char *renameStmtSql = DeparseTreeNode(node); + + EnsureSequentialModeForSchemaDDL(); + + /* to prevent recursion with mx we disable ddl propagation */ + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) renameStmtSql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} + + +/* + * AlterSchemaRenameStmtObjectAddress returns the ObjectAddress of the schema that is + * the object of the RenameStmt. Errors if missing_ok is false. + */ +ObjectAddress +AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok) +{ + RenameStmt *stmt = castNode(RenameStmt, node); + Assert(stmt->renameType == OBJECT_SCHEMA); + + const char *schemaName = stmt->subname; + Oid schemaOid = get_namespace_oid(schemaName, missing_ok); + + ObjectAddress address = { 0 }; + ObjectAddressSet(address, NamespaceRelationId, schemaOid); + + return address; +} + + /* * FilterDistributedSchemas filters the schema list and returns the distributed ones * as a list @@ -186,3 +241,41 @@ FilterDistributedSchemas(List *schemas) return distributedSchemas; } + + +/* + * EnsureSequentialModeForSchemaDDL makes sure that the current transaction is already in + * sequential mode, or can still safely be put in sequential mode, it errors if that is + * not possible. The error contains information for the user to retry the transaction with + * sequential mode set from the begining. + * + * Copy-pasted from type.c + */ +static void +EnsureSequentialModeForSchemaDDL(void) +{ + if (!IsTransactionBlock()) + { + /* we do not need to switch to sequential mode if we are not in a transaction */ + return; + } + + if (ParallelQueryExecutedInTransaction()) + { + ereport(ERROR, (errmsg("cannot create or modify type because there was a " + "parallel operation on a distributed table in the " + "transaction"), + errdetail("When creating or altering a schema, Citus needs to " + "perform all operations over a single connection per " + "node to ensure consistency."), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.multi_shard_modify_mode TO " + "\'sequential\';\""))); + } + + ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), + errdetail("Schema is created or altered. To make sure subsequent " + "commands see the schema correctly we need to make sure to " + "use only one connection for all future commands"))); + SetLocalMultiShardModifyModeToSequential(); +} diff --git a/src/backend/distributed/deparser/deparse_schema_stmts.c b/src/backend/distributed/deparser/deparse_schema_stmts.c index 2aa4a6439..35321f2e6 100644 --- a/src/backend/distributed/deparser/deparse_schema_stmts.c +++ b/src/backend/distributed/deparser/deparse_schema_stmts.c @@ -21,6 +21,7 @@ static void AppendGrantOnSchemaStmt(StringInfo buf, GrantStmt *stmt); static void AppendGrantOnSchemaPrivileges(StringInfo buf, GrantStmt *stmt); static void AppendGrantOnSchemaSchemas(StringInfo buf, GrantStmt *stmt); static void AppendGrantOnSchemaGrantees(StringInfo buf, GrantStmt *stmt); +static void AppendAlterSchemaRenameStmt(StringInfo buf, RenameStmt *stmt); char * DeparseGrantOnSchemaStmt(Node *node) @@ -37,6 +38,20 @@ DeparseGrantOnSchemaStmt(Node *node) } +char * +DeparseAlterSchemaRenameStmt(Node *node) +{ + RenameStmt *stmt = castNode(RenameStmt, node); + + StringInfoData str = { 0 }; + initStringInfo(&str); + + AppendAlterSchemaRenameStmt(&str, stmt); + + return str.data; +} + + static void AppendGrantOnSchemaStmt(StringInfo buf, GrantStmt *stmt) { @@ -131,3 +146,12 @@ AppendGrantOnSchemaGrantees(StringInfo buf, GrantStmt *stmt) } } } + + +static void +AppendAlterSchemaRenameStmt(StringInfo buf, RenameStmt *stmt) +{ + Assert(stmt->renameType == OBJECT_SCHEMA); + + appendStringInfo(buf, "ALTER SCHEMA %s RENAME TO %s;", stmt->subname, stmt->newname); +} diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 292521641..3bd7b4967 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -227,7 +227,8 @@ extern List * PreprocessDropSchemaStmt(Node *dropSchemaStatement, extern List * PreprocessAlterObjectSchemaStmt(Node *alterObjectSchemaStmt, const char *alterObjectSchemaCommand); extern List * PreprocessGrantOnSchemaStmt(Node *node, const char *queryString); - +extern List * PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString); +extern ObjectAddress AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok); /* sequence.c - forward declarations */ extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index 62d8dd707..475ff4515 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -53,6 +53,7 @@ extern void QualifyAlterTableSchemaStmt(Node *stmt); /* forward declarations for deparse_schema_stmts.c */ extern char * DeparseGrantOnSchemaStmt(Node *stmt); +extern char * DeparseAlterSchemaRenameStmt(Node *stmt); /* forward declarations for deparse_type_stmts.c */ extern char * DeparseCompositeTypeStmt(Node *stmt); From 5242dcfe991a5d209f33d7565f48ba76382f0782 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Tue, 1 Dec 2020 19:29:13 +0300 Subject: [PATCH 2/2] Add tests for propagating alter schema rename --- src/backend/distributed/commands/schema.c | 5 +- .../deparser/deparse_schema_stmts.c | 3 +- .../regress/expected/multi_schema_support.out | 58 ++++++++++++++++++- src/test/regress/sql/multi_schema_support.sql | 29 +++++++++- 4 files changed, 90 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index 1c4e1286e..795ec9f07 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -174,6 +174,9 @@ PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString) return NIL; } + /* fully qualify */ + QualifyTreeNode(node); + /* deparse sql*/ const char *renameStmtSql = DeparseTreeNode(node); @@ -262,7 +265,7 @@ EnsureSequentialModeForSchemaDDL(void) if (ParallelQueryExecutedInTransaction()) { - ereport(ERROR, (errmsg("cannot create or modify type because there was a " + ereport(ERROR, (errmsg("cannot create or modify schema because there was a " "parallel operation on a distributed table in the " "transaction"), errdetail("When creating or altering a schema, Citus needs to " diff --git a/src/backend/distributed/deparser/deparse_schema_stmts.c b/src/backend/distributed/deparser/deparse_schema_stmts.c index 35321f2e6..b09471983 100644 --- a/src/backend/distributed/deparser/deparse_schema_stmts.c +++ b/src/backend/distributed/deparser/deparse_schema_stmts.c @@ -153,5 +153,6 @@ AppendAlterSchemaRenameStmt(StringInfo buf, RenameStmt *stmt) { Assert(stmt->renameType == OBJECT_SCHEMA); - appendStringInfo(buf, "ALTER SCHEMA %s RENAME TO %s;", stmt->subname, stmt->newname); + appendStringInfo(buf, "ALTER SCHEMA %s RENAME TO %s;", + quote_identifier(stmt->subname), quote_identifier(stmt->newname)); } diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index 53d34abce..67c00092b 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -1442,6 +1442,58 @@ ALTER TABLE "CiTuS.TeeN"."TeeNTabLE.1!?!" ADD COLUMN "NEW_TeeN:COl" text; DELETE FROM "CiTuS.TeeN"."TeeNTabLE.1!?!" WHERE "TeNANt_Id"=1; -- Some more DDL ALTER TABLE "CiTuS.TeeN"."TeeNTabLE.1!?!" ADD CONSTRAINT "ConsNAmE<>" PRIMARY KEY ("TeNANt_Id"); +-- test schema rename propagation +CREATE SCHEMA foo; +CREATE TABLE foo.test (x int, y int); +SELECT create_distributed_table('foo.test', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO foo.test VALUES (1, 1), (2, 2); +ALTER SCHEMA foo rename to bar; +SELECT COUNT(*) FROM bar.test; + count +--------------------------------------------------------------------- + 2 +(1 row) + +-- test propagation with weird name +ALTER SCHEMA "CiTuS.TeeN" RENAME TO "Citus'Teen123"; +SELECT * FROM "Citus'Teen123"."TeeNTabLE.1!?!" ORDER BY id; + id | TeNANt_Id | NEW_TeeN:COl +--------------------------------------------------------------------- + 1 | 0 | + 2 | 3 | + 3 | 2 | + 4 | 4 | +(4 rows) + +-- test error +INSERT INTO bar.test VALUES (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9); +BEGIN; + SELECT COUNT(*) FROM bar.test; + count +--------------------------------------------------------------------- + 9 +(1 row) + + ALTER SCHEMA bar RENAME TO foo; +ERROR: cannot create or modify schema because there was a parallel operation on a distributed table in the transaction +DETAIL: When creating or altering a schema, Citus needs to perform all operations over a single connection per node to ensure consistency. +HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" +ROLLBACK; +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT COUNT(*) FROM bar.test; + count +--------------------------------------------------------------------- + 9 +(1 row) + + ALTER SCHEMA bar RENAME TO foo; +ROLLBACK; -- Clean up the created schema DROP SCHEMA run_test_schema CASCADE; NOTICE: drop cascades to table run_test_schema.test_table @@ -1451,7 +1503,9 @@ DETAIL: drop cascades to table test_schema_support_join_1.nation_hash drop cascades to table test_schema_support_join_1.nation_hash_2 DROP SCHEMA test_schema_support_join_2 CASCADE; NOTICE: drop cascades to table test_schema_support_join_2.nation_hash -DROP SCHEMA "CiTuS.TeeN" CASCADE; -NOTICE: drop cascades to table "CiTuS.TeeN"."TeeNTabLE.1!?!" +DROP SCHEMA "Citus'Teen123" CASCADE; +NOTICE: drop cascades to table "Citus'Teen123"."TeeNTabLE.1!?!" DROP SCHEMA "CiTUS.TEEN2" CASCADE; NOTICE: drop cascades to table "CiTUS.TEEN2"."CAPITAL_TABLE" +DROP SCHEMA bar CASCADE; +NOTICE: drop cascades to table bar.test diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index 3c41aae02..05843f84d 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -991,9 +991,36 @@ DELETE FROM "CiTuS.TeeN"."TeeNTabLE.1!?!" WHERE "TeNANt_Id"=1; -- Some more DDL ALTER TABLE "CiTuS.TeeN"."TeeNTabLE.1!?!" ADD CONSTRAINT "ConsNAmE<>" PRIMARY KEY ("TeNANt_Id"); +-- test schema rename propagation +CREATE SCHEMA foo; +CREATE TABLE foo.test (x int, y int); +SELECT create_distributed_table('foo.test', 'x'); +INSERT INTO foo.test VALUES (1, 1), (2, 2); +ALTER SCHEMA foo rename to bar; +SELECT COUNT(*) FROM bar.test; + +-- test propagation with weird name +ALTER SCHEMA "CiTuS.TeeN" RENAME TO "Citus'Teen123"; +SELECT * FROM "Citus'Teen123"."TeeNTabLE.1!?!" ORDER BY id; + +-- test error +INSERT INTO bar.test VALUES (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9); + +BEGIN; + SELECT COUNT(*) FROM bar.test; + ALTER SCHEMA bar RENAME TO foo; +ROLLBACK; + +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT COUNT(*) FROM bar.test; + ALTER SCHEMA bar RENAME TO foo; +ROLLBACK; + -- Clean up the created schema DROP SCHEMA run_test_schema CASCADE; DROP SCHEMA test_schema_support_join_1 CASCADE; DROP SCHEMA test_schema_support_join_2 CASCADE; -DROP SCHEMA "CiTuS.TeeN" CASCADE; +DROP SCHEMA "Citus'Teen123" CASCADE; DROP SCHEMA "CiTUS.TEEN2" CASCADE; +DROP SCHEMA bar CASCADE;