Adds alter database propagation

pull/7178/head
gindibay 2023-09-01 10:21:41 +03:00
parent 5034f8eba5
commit 09ab1ac4bb
5 changed files with 378 additions and 2 deletions

View File

@ -147,3 +147,123 @@ PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
} }
/*
* PreprocessAlterDatabaseStmt is executed before the statement is applied to the local
* postgres instance.
*
* In this stage we can prepare the commands that need to be run on all workers to grant
* on databases.
*/
List *
PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagate())
{
return NIL;
}
AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node);
EnsureCoordinator();
char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local
* postgres instance.
*
* In this stage we can prepare the commands that need to be run on all workers to grant
* on databases.
*/
List *
PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagate())
{
return NIL;
}
AlterDatabaseSetStmt *stmt = castNode(AlterDatabaseSetStmt, node);
EnsureCoordinator();
char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local
* postgres instance.
*
* In this stage we can prepare the commands that need to be run on all workers to grant
* on databases.
*/
List *
PreprocessAlterDatabaseRenameStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagate())
{
return NIL;
}
RenameStmt *stmt = castNode(RenameStmt, node);
EnsureCoordinator();
char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local
* postgres instance.
*
* In this stage we can prepare the commands that need to be run on all workers to grant
* on databases.
*/
List *
PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagate())
{
return NIL;
}
AlterDatabaseRefreshCollStmt *stmt = castNode(AlterDatabaseRefreshCollStmt, node);
EnsureCoordinator();
char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -444,6 +444,49 @@ static DistributeObjectOps Database_Grant = {
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Database_Alter = {
.deparse = DeparseAlterDatabaseStmt,
.qualify = NULL,
.preprocess = PreprocessAlterDatabaseStmt,
.postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_ALTER,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Database_Set = {
.deparse = DeparseAlterDatabaseSetStmt,
.qualify = NULL,
.preprocess = PreprocessAlterDatabaseSetStmt,
.postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_ALTER,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Database_Rename = {
.deparse = DeparseAlterDatabaseRenameStmt,
.qualify = NULL,
.preprocess = PreprocessAlterDatabaseRenameStmt,
.postprocess = NULL,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_ALTER,
.address = NULL, /* TODO: RenameDatabaseStmtObjectAddress, */
.markDistributed = false,
};
static DistributeObjectOps Database_RefreshColl = {
.deparse = DeparseAlterDatabaseRefreshCollStmt,
.qualify = NULL,
.preprocess = PreprocessAlterDatabaseRefreshCollStmt,
.postprocess = NULL,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_ALTER,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Domain_Alter = { static DistributeObjectOps Domain_Alter = {
.deparse = DeparseAlterDomainStmt, .deparse = DeparseAlterDomainStmt,
.qualify = QualifyAlterDomainStmt, .qualify = QualifyAlterDomainStmt,
@ -1272,7 +1315,6 @@ static DistributeObjectOps Trigger_Rename = {
.markDistributed = false, .markDistributed = false,
}; };
/* /*
* GetDistributeObjectOps looks up the DistributeObjectOps which handles the node. * GetDistributeObjectOps looks up the DistributeObjectOps which handles the node.
* *
@ -1283,6 +1325,21 @@ GetDistributeObjectOps(Node *node)
{ {
switch (nodeTag(node)) switch (nodeTag(node))
{ {
case T_AlterDatabaseStmt:
{
return &Database_Alter;
}
case T_AlterDatabaseRefreshCollStmt:
{
return &Database_RefreshColl;
}
case T_AlterDatabaseSetStmt:
{
return &Database_Set;
}
case T_AlterDomainStmt: case T_AlterDomainStmt:
{ {
return &Domain_Alter; return &Domain_Alter;
@ -1975,6 +2032,11 @@ GetDistributeObjectOps(Node *node)
return &Collation_Rename; return &Collation_Rename;
} }
case OBJECT_DATABASE:
{
return &Database_Rename;
}
case OBJECT_DOMAIN: case OBJECT_DOMAIN:
{ {
return &Domain_Rename; return &Domain_Rename;

View File

@ -20,9 +20,12 @@
#include "distributed/deparser.h" #include "distributed/deparser.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/deparser.h"
#include "distributed/log_utils.h"
static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt); static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt);
static void AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt);
char * char *
DeparseAlterDatabaseOwnerStmt(Node *node) DeparseAlterDatabaseOwnerStmt(Node *node)
@ -82,6 +85,120 @@ AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt)
} }
static void
AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
{
appendStringInfo(buf, "ALTER DATABASE %s ", quote_identifier(stmt->dbname));
if (stmt->options)
{
ListCell *cell = NULL;
appendStringInfo(buf, "WITH OPTION ");
foreach(cell, stmt->options)
{
DefElem *def = castNode(DefElem, lfirst(cell));
appendStringInfo(buf, "%s %s", quote_identifier(def->defname),
quote_literal_cstr(strVal(def->arg)));
if (cell != list_tail(stmt->options))
{
appendStringInfo(buf, ", ");
}
}
}
appendStringInfo(buf, ";");
}
static void
AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt)
{
appendStringInfo(buf, "ALTER DATABASE %s SET ", quote_identifier(stmt->dbname));
VariableSetStmt *varSetStmt = castNode(VariableSetStmt, stmt->setstmt);
if (varSetStmt->kind == VAR_SET_VALUE)
{
appendStringInfo(buf, "%s = %s", quote_identifier(varSetStmt->name),
quote_literal_cstr(strVal(varSetStmt->args)));
}
else if (varSetStmt->kind == VAR_RESET_ALL)
{
appendStringInfo(buf, "RESET ALL");
}
else if (varSetStmt->kind == VAR_RESET)
{
appendStringInfo(buf, "RESET %s", quote_identifier(varSetStmt->name));
}
else
{
ereport(ERROR,
errmsg("unrecognized AlterDatabaseSetStmt kind: %d",
varSetStmt->kind));
}
appendStringInfo(buf, ";");
}
static void
AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
{
appendStringInfo(buf, "ALTER DATABASE %s ", quote_identifier(stmt->dbname));
if (stmt->options)
{
ListCell *cell = NULL;
appendStringInfo(buf, "WITH OPTION ");
foreach(cell, stmt->options)
{
DefElem *def = castNode(DefElem, lfirst(cell));
appendStringInfo(buf, "%s %s", quote_identifier(def->defname),
quote_literal_cstr(strVal(def->arg)));
if (cell != list_tail(stmt->options))
{
appendStringInfo(buf, ", ");
}
}
}
appendStringInfo(buf, ";");
}
static void
AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt)
{
appendStringInfo(buf, "ALTER DATABASE %s SET ", quote_identifier(stmt->dbname));
VariableSetStmt *varSetStmt = castNode(VariableSetStmt, stmt->setstmt);
if (varSetStmt->kind == VAR_SET_VALUE)
{
appendStringInfo(buf, "%s = %s", quote_identifier(varSetStmt->name),
quote_literal_cstr(strVal(varSetStmt->args)));
}
else if (varSetStmt->kind == VAR_RESET_ALL)
{
appendStringInfo(buf, "RESET ALL");
}
else if (varSetStmt->kind == VAR_RESET)
{
appendStringInfo(buf, "RESET %s", quote_identifier(varSetStmt->name));
}
else
{
ereport(ERROR,
errmsg("unrecognized AlterDatabaseSetStmt kind: %d",
varSetStmt->kind));
}
appendStringInfo(buf, ";");
}
char * char *
DeparseGrantOnDatabaseStmt(Node *node) DeparseGrantOnDatabaseStmt(Node *node)
{ {
@ -95,3 +212,61 @@ DeparseGrantOnDatabaseStmt(Node *node)
return str.data; return str.data;
} }
char *
DeparseAlterDatabaseStmt(Node *node)
{
AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
AppendAlterDatabaseStmt(&str, stmt);
return str.data;
}
char *
DeparseAlterDatabaseSetStmt(Node *node)
{
AlterDatabaseSetStmt *stmt = castNode(AlterDatabaseSetStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
AppendAlterDatabaseSetStmt(&str, stmt);
return str.data;
}
char *
DeparseAlterDatabaseRenameStmt(Node *node)
{
RenameStmt *stmt = (RenameStmt *) node;
StringInfoData str;
initStringInfo(&str);
appendStringInfo(&str, "ALTER DATABASE %s RENAME TO %s;", quote_identifier(
stmt->subname), quote_identifier(stmt->newname));
return str.data;
}
char *
DeparseAlterDatabaseRefreshCollStmt(Node *node)
{
AlterDatabaseRefreshCollStmt *stmt = (AlterDatabaseRefreshCollStmt *) node;
StringInfoData str;
initStringInfo(&str);
appendStringInfo(&str, "ALTER DATABASE %s REFRESH COLLATION;", quote_identifier(
stmt->dbname));
return str.data;
}

View File

@ -223,6 +223,21 @@ extern List * DatabaseOwnerDDLCommands(const ObjectAddress *address);
extern List * PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString, extern List * PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterDatabaseRenameStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
/* domain.c - forward declarations */ /* domain.c - forward declarations */
extern List * CreateDomainStmtObjectAddress(Node *node, bool missing_ok, bool extern List * CreateDomainStmtObjectAddress(Node *node, bool missing_ok, bool
isPostprocess); isPostprocess);

View File

@ -223,6 +223,10 @@ extern char * DeparseAlterExtensionStmt(Node *stmt);
/* forward declarations for deparse_database_stmts.c */ /* forward declarations for deparse_database_stmts.c */
extern char * DeparseAlterDatabaseOwnerStmt(Node *node); extern char * DeparseAlterDatabaseOwnerStmt(Node *node);
extern char * DeparseGrantOnDatabaseStmt(Node *node); extern char * DeparseGrantOnDatabaseStmt(Node *node);
extern char * DeparseAlterDatabaseStmt(Node *node);
extern char * DeparseAlterDatabaseSetStmt(Node *node);
extern char * DeparseAlterDatabaseRenameStmt(Node *node);
extern char * DeparseAlterDatabaseRefreshCollStmt(Node *node);
/* forward declaration for deparse_publication_stmts.c */ /* forward declaration for deparse_publication_stmts.c */
extern char * DeparseCreatePublicationStmt(Node *stmt); extern char * DeparseCreatePublicationStmt(Node *stmt);