Propagate alter schema rename

pull/4373/head
Ahmet Gedemenli 2020-12-01 18:46:44 +03:00
parent fde93072dd
commit 514c6a76ac
5 changed files with 132 additions and 1 deletions

View File

@ -400,6 +400,13 @@ static DistributeObjectOps Schema_Grant = {
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
}; };
static DistributeObjectOps Schema_Rename = {
.deparse = DeparseAlterSchemaRenameStmt,
.qualify = NULL,
.preprocess = PreprocessAlterSchemaRenameStmt,
.postprocess = NULL,
.address = AlterSchemaRenameStmtObjectAddress,
};
static DistributeObjectOps Table_AlterTable = { static DistributeObjectOps Table_AlterTable = {
.deparse = NULL, .deparse = NULL,
.qualify = NULL, .qualify = NULL,
@ -871,6 +878,11 @@ GetDistributeObjectOps(Node *node)
return &Routine_Rename; return &Routine_Rename;
} }
case OBJECT_SCHEMA:
{
return &Schema_Rename;
}
case OBJECT_TYPE: case OBJECT_TYPE:
{ {
return &Type_Rename; return &Type_Rename;

View File

@ -15,6 +15,7 @@
#include "access/heapam.h" #include "access/heapam.h"
#include "access/htup.h" #include "access/htup.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "catalog/pg_namespace.h" #include "catalog/pg_namespace.h"
@ -26,7 +27,9 @@
#include "distributed/metadata/distobject.h" #include "distributed/metadata/distobject.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include <distributed/metadata_sync.h> #include <distributed/metadata_sync.h>
#include "distributed/multi_executor.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include <distributed/remote_commands.h> #include <distributed/remote_commands.h>
#include <distributed/remote_commands.h> #include <distributed/remote_commands.h>
@ -38,6 +41,7 @@
static List * FilterDistributedSchemas(List *schemas); static List * FilterDistributedSchemas(List *schemas);
static void EnsureSequentialModeForSchemaDDL(void);
/* /*
@ -153,6 +157,57 @@ PreprocessGrantOnSchemaStmt(Node *node, const char *queryString)
} }
/*
* PreprocessAlterSchemaRenameStmt is called when the user is renaming a schema.
* The invocation happens before the statement is applied locally.
*
* As the schema already exists we have access to the ObjectAddress for the schema, this
* is used to check if the schmea is distributed. If the schema is distributed the rename
* is executed on all the workers to keep the schemas in sync across the cluster.
*/
List *
PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString)
{
ObjectAddress schemaAddress = GetObjectAddressFromParseTree(node, false);
if (!ShouldPropagateObject(&schemaAddress))
{
return NIL;
}
/* deparse sql*/
const char *renameStmtSql = DeparseTreeNode(node);
EnsureSequentialModeForSchemaDDL();
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) renameStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* AlterSchemaRenameStmtObjectAddress returns the ObjectAddress of the schema that is
* the object of the RenameStmt. Errors if missing_ok is false.
*/
ObjectAddress
AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_SCHEMA);
const char *schemaName = stmt->subname;
Oid schemaOid = get_namespace_oid(schemaName, missing_ok);
ObjectAddress address = { 0 };
ObjectAddressSet(address, NamespaceRelationId, schemaOid);
return address;
}
/* /*
* FilterDistributedSchemas filters the schema list and returns the distributed ones * FilterDistributedSchemas filters the schema list and returns the distributed ones
* as a list * as a list
@ -186,3 +241,41 @@ FilterDistributedSchemas(List *schemas)
return distributedSchemas; return distributedSchemas;
} }
/*
* EnsureSequentialModeForSchemaDDL makes sure that the current transaction is already in
* sequential mode, or can still safely be put in sequential mode, it errors if that is
* not possible. The error contains information for the user to retry the transaction with
* sequential mode set from the begining.
*
* Copy-pasted from type.c
*/
static void
EnsureSequentialModeForSchemaDDL(void)
{
if (!IsTransactionBlock())
{
/* we do not need to switch to sequential mode if we are not in a transaction */
return;
}
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg("cannot create or modify type because there was a "
"parallel operation on a distributed table in the "
"transaction"),
errdetail("When creating or altering a schema, Citus needs to "
"perform all operations over a single connection per "
"node to ensure consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail("Schema is created or altered. To make sure subsequent "
"commands see the schema correctly we need to make sure to "
"use only one connection for all future commands")));
SetLocalMultiShardModifyModeToSequential();
}

View File

@ -21,6 +21,7 @@ static void AppendGrantOnSchemaStmt(StringInfo buf, GrantStmt *stmt);
static void AppendGrantOnSchemaPrivileges(StringInfo buf, GrantStmt *stmt); static void AppendGrantOnSchemaPrivileges(StringInfo buf, GrantStmt *stmt);
static void AppendGrantOnSchemaSchemas(StringInfo buf, GrantStmt *stmt); static void AppendGrantOnSchemaSchemas(StringInfo buf, GrantStmt *stmt);
static void AppendGrantOnSchemaGrantees(StringInfo buf, GrantStmt *stmt); static void AppendGrantOnSchemaGrantees(StringInfo buf, GrantStmt *stmt);
static void AppendAlterSchemaRenameStmt(StringInfo buf, RenameStmt *stmt);
char * char *
DeparseGrantOnSchemaStmt(Node *node) DeparseGrantOnSchemaStmt(Node *node)
@ -37,6 +38,20 @@ DeparseGrantOnSchemaStmt(Node *node)
} }
char *
DeparseAlterSchemaRenameStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
AppendAlterSchemaRenameStmt(&str, stmt);
return str.data;
}
static void static void
AppendGrantOnSchemaStmt(StringInfo buf, GrantStmt *stmt) AppendGrantOnSchemaStmt(StringInfo buf, GrantStmt *stmt)
{ {
@ -131,3 +146,12 @@ AppendGrantOnSchemaGrantees(StringInfo buf, GrantStmt *stmt)
} }
} }
} }
static void
AppendAlterSchemaRenameStmt(StringInfo buf, RenameStmt *stmt)
{
Assert(stmt->renameType == OBJECT_SCHEMA);
appendStringInfo(buf, "ALTER SCHEMA %s RENAME TO %s;", stmt->subname, stmt->newname);
}

View File

@ -227,7 +227,8 @@ extern List * PreprocessDropSchemaStmt(Node *dropSchemaStatement,
extern List * PreprocessAlterObjectSchemaStmt(Node *alterObjectSchemaStmt, extern List * PreprocessAlterObjectSchemaStmt(Node *alterObjectSchemaStmt,
const char *alterObjectSchemaCommand); const char *alterObjectSchemaCommand);
extern List * PreprocessGrantOnSchemaStmt(Node *node, const char *queryString); extern List * PreprocessGrantOnSchemaStmt(Node *node, const char *queryString);
extern List * PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString);
extern ObjectAddress AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok);
/* sequence.c - forward declarations */ /* sequence.c - forward declarations */
extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);

View File

@ -53,6 +53,7 @@ extern void QualifyAlterTableSchemaStmt(Node *stmt);
/* forward declarations for deparse_schema_stmts.c */ /* forward declarations for deparse_schema_stmts.c */
extern char * DeparseGrantOnSchemaStmt(Node *stmt); extern char * DeparseGrantOnSchemaStmt(Node *stmt);
extern char * DeparseAlterSchemaRenameStmt(Node *stmt);
/* forward declarations for deparse_type_stmts.c */ /* forward declarations for deparse_type_stmts.c */
extern char * DeparseCompositeTypeStmt(Node *stmt); extern char * DeparseCompositeTypeStmt(Node *stmt);