mirror of https://github.com/citusdata/citus.git
Adds alter database propagation - with and refresh collation (#7172)
DESCRIPTION: Adds ALTER DATABASE WITH ... and REFRESH COLLATION VERSION support This PR adds supports for basic ALTER DATABASE statements propagation support. Below statements are supported: ALTER DATABASE <database_name> with IS_TEMPLATE <true/false>; ALTER DATABASE <database_name> with CONNECTION LIMIT <integer_value>; ALTER DATABASE <database_name> REFRESH COLLATION VERSION; --------- Co-authored-by: Jelte Fennema-Nio <jelte.fennema@microsoft.com>pull/7195/head^2
parent
1da99f8423
commit
e5e64b7454
|
@ -147,3 +147,68 @@ PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
|
|||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PreprocessAlterDatabaseStmt is executed before the statement is applied to the local
|
||||
* postgres instance.
|
||||
*
|
||||
* In this stage we can prepare the commands that need to be run on all workers to grant
|
||||
* on databases.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
if (!ShouldPropagate())
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node);
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
char *sql = DeparseTreeNode((Node *) stmt);
|
||||
|
||||
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||
(void *) sql,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||
|
||||
/*
|
||||
* PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local
|
||||
* postgres instance.
|
||||
*
|
||||
* In this stage we can prepare the commands that need to be run on all workers to grant
|
||||
* on databases.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
if (!ShouldPropagate())
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
AlterDatabaseRefreshCollStmt *stmt = castNode(AlterDatabaseRefreshCollStmt, node);
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
char *sql = DeparseTreeNode((Node *) stmt);
|
||||
|
||||
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||
(void *) sql,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
|
|
@ -444,6 +444,30 @@ static DistributeObjectOps Database_Grant = {
|
|||
.markDistributed = false,
|
||||
};
|
||||
|
||||
static DistributeObjectOps Database_Alter = {
|
||||
.deparse = DeparseAlterDatabaseStmt,
|
||||
.qualify = NULL,
|
||||
.preprocess = PreprocessAlterDatabaseStmt,
|
||||
.postprocess = NULL,
|
||||
.objectType = OBJECT_DATABASE,
|
||||
.operationType = DIST_OPS_ALTER,
|
||||
.address = NULL,
|
||||
.markDistributed = false,
|
||||
};
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||
static DistributeObjectOps Database_RefreshColl = {
|
||||
.deparse = DeparseAlterDatabaseRefreshCollStmt,
|
||||
.qualify = NULL,
|
||||
.preprocess = PreprocessAlterDatabaseRefreshCollStmt,
|
||||
.postprocess = NULL,
|
||||
.objectType = OBJECT_DATABASE,
|
||||
.operationType = DIST_OPS_ALTER,
|
||||
.address = NULL,
|
||||
.markDistributed = false,
|
||||
};
|
||||
#endif
|
||||
|
||||
static DistributeObjectOps Domain_Alter = {
|
||||
.deparse = DeparseAlterDomainStmt,
|
||||
.qualify = QualifyAlterDomainStmt,
|
||||
|
@ -1272,7 +1296,6 @@ static DistributeObjectOps Trigger_Rename = {
|
|||
.markDistributed = false,
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* GetDistributeObjectOps looks up the DistributeObjectOps which handles the node.
|
||||
*
|
||||
|
@ -1283,6 +1306,18 @@ GetDistributeObjectOps(Node *node)
|
|||
{
|
||||
switch (nodeTag(node))
|
||||
{
|
||||
case T_AlterDatabaseStmt:
|
||||
{
|
||||
return &Database_Alter;
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||
case T_AlterDatabaseRefreshCollStmt:
|
||||
{
|
||||
return &Database_RefreshColl;
|
||||
}
|
||||
|
||||
#endif
|
||||
case T_AlterDomainStmt:
|
||||
{
|
||||
return &Domain_Alter;
|
||||
|
|
|
@ -20,9 +20,12 @@
|
|||
|
||||
#include "distributed/deparser.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "distributed/deparser.h"
|
||||
#include "distributed/log_utils.h"
|
||||
|
||||
static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
|
||||
|
||||
static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt);
|
||||
|
||||
char *
|
||||
DeparseAlterDatabaseOwnerStmt(Node *node)
|
||||
|
@ -82,6 +85,52 @@ AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt)
|
|||
}
|
||||
|
||||
|
||||
static void
|
||||
AppendDefElemConnLimit(StringInfo buf, DefElem *def)
|
||||
{
|
||||
appendStringInfo(buf, " CONNECTION LIMIT %ld", (long int) defGetNumeric(def));
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
|
||||
{
|
||||
appendStringInfo(buf, "ALTER DATABASE %s ", quote_identifier(stmt->dbname));
|
||||
|
||||
if (stmt->options)
|
||||
{
|
||||
ListCell *cell = NULL;
|
||||
appendStringInfo(buf, "WITH ");
|
||||
foreach(cell, stmt->options)
|
||||
{
|
||||
DefElem *def = castNode(DefElem, lfirst(cell));
|
||||
if (strcmp(def->defname, "is_template") == 0)
|
||||
{
|
||||
appendStringInfo(buf, "IS_TEMPLATE %s",
|
||||
quote_literal_cstr(strVal(def->arg)));
|
||||
}
|
||||
else if (strcmp(def->defname, "connection_limit") == 0)
|
||||
{
|
||||
AppendDefElemConnLimit(buf, def);
|
||||
}
|
||||
else if (strcmp(def->defname, "allow_connections") == 0)
|
||||
{
|
||||
ereport(ERROR,
|
||||
errmsg("ALLOW_CONNECTIONS is not supported"));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR,
|
||||
errmsg("unrecognized ALTER DATABASE option: %s",
|
||||
def->defname));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
appendStringInfo(buf, ";");
|
||||
}
|
||||
|
||||
|
||||
char *
|
||||
DeparseGrantOnDatabaseStmt(Node *node)
|
||||
{
|
||||
|
@ -95,3 +144,37 @@ DeparseGrantOnDatabaseStmt(Node *node)
|
|||
|
||||
return str.data;
|
||||
}
|
||||
|
||||
|
||||
char *
|
||||
DeparseAlterDatabaseStmt(Node *node)
|
||||
{
|
||||
AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node);
|
||||
|
||||
StringInfoData str = { 0 };
|
||||
initStringInfo(&str);
|
||||
|
||||
AppendAlterDatabaseStmt(&str, stmt);
|
||||
|
||||
return str.data;
|
||||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||
char *
|
||||
DeparseAlterDatabaseRefreshCollStmt(Node *node)
|
||||
{
|
||||
AlterDatabaseRefreshCollStmt *stmt = (AlterDatabaseRefreshCollStmt *) node;
|
||||
|
||||
StringInfoData str;
|
||||
initStringInfo(&str);
|
||||
|
||||
appendStringInfo(&str, "ALTER DATABASE %s REFRESH COLLATION VERSION;",
|
||||
quote_identifier(
|
||||
stmt->dbname));
|
||||
|
||||
return str.data;
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
|
|
@ -223,6 +223,14 @@ extern List * DatabaseOwnerDDLCommands(const ObjectAddress *address);
|
|||
extern List * PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
|
||||
extern List * PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
|
||||
extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
|
||||
|
||||
/* domain.c - forward declarations */
|
||||
extern List * CreateDomainStmtObjectAddress(Node *node, bool missing_ok, bool
|
||||
isPostprocess);
|
||||
|
|
|
@ -223,6 +223,8 @@ extern char * DeparseAlterExtensionStmt(Node *stmt);
|
|||
/* forward declarations for deparse_database_stmts.c */
|
||||
extern char * DeparseAlterDatabaseOwnerStmt(Node *node);
|
||||
extern char * DeparseGrantOnDatabaseStmt(Node *node);
|
||||
extern char * DeparseAlterDatabaseStmt(Node *node);
|
||||
extern char * DeparseAlterDatabaseRefreshCollStmt(Node *node);
|
||||
|
||||
/* forward declaration for deparse_publication_stmts.c */
|
||||
extern char * DeparseCreatePublicationStmt(Node *stmt);
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
set citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%ALTER DATABASE%';
|
||||
-- since ALLOW_CONNECTIONS alter option should be executed in a different database
|
||||
-- and since we don't have a multiple database support for now,
|
||||
-- this statement will get error
|
||||
alter database regression ALLOW_CONNECTIONS false;
|
||||
ERROR: ALLOW_CONNECTIONS is not supported
|
||||
alter database regression with CONNECTION LIMIT 100;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
alter database regression with IS_TEMPLATE true CONNECTION LIMIT 50;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true' CONNECTION LIMIT 50;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true' CONNECTION LIMIT 50;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
alter database regression with CONNECTION LIMIT -1;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
alter database regression with IS_TEMPLATE true;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true';
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true';
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
alter database regression with IS_TEMPLATE false;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'false';
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'false';
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
-- this statement will get error since we don't have a multiple database support for now
|
||||
alter database regression rename to regression2;
|
||||
ERROR: current database cannot be renamed
|
||||
set citus.log_remote_commands = false;
|
|
@ -1523,6 +1523,13 @@ ORDER BY is_coordinator DESC, result;
|
|||
f | [{"indexdefs": ["CREATE UNIQUE INDEX referencing__key ON pg15.referencing USING btree (test_2)"], "indexnames": ["referencing__key"]}, {"indexdefs": ["CREATE UNIQUE INDEX referencing__key1 ON pg15.referencing USING btree (test_3) NULLS NOT DISTINCT"], "indexnames": ["referencing__key1"]}]
|
||||
(3 rows)
|
||||
|
||||
set citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%ALTER DATABASE%';
|
||||
alter database regression REFRESH COLLATION VERSION;
|
||||
NOTICE: version has not changed
|
||||
NOTICE: issuing ALTER DATABASE regression REFRESH COLLATION VERSION;
|
||||
NOTICE: issuing ALTER DATABASE regression REFRESH COLLATION VERSION;
|
||||
set citus.log_remote_commands = false;
|
||||
-- Clean up
|
||||
\set VERBOSITY terse
|
||||
SET client_min_messages TO ERROR;
|
||||
|
|
|
@ -51,6 +51,7 @@ test: multi_metadata_attributes
|
|||
test: multi_read_from_secondaries
|
||||
|
||||
test: grant_on_database_propagation
|
||||
test: alter_database_propagation
|
||||
|
||||
# ----------
|
||||
# multi_citus_tools tests utility functions written for citus tools
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
set citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%ALTER DATABASE%';
|
||||
|
||||
-- since ALLOW_CONNECTIONS alter option should be executed in a different database
|
||||
-- and since we don't have a multiple database support for now,
|
||||
-- this statement will get error
|
||||
alter database regression ALLOW_CONNECTIONS false;
|
||||
|
||||
|
||||
alter database regression with CONNECTION LIMIT 100;
|
||||
alter database regression with IS_TEMPLATE true CONNECTION LIMIT 50;
|
||||
alter database regression with CONNECTION LIMIT -1;
|
||||
alter database regression with IS_TEMPLATE true;
|
||||
alter database regression with IS_TEMPLATE false;
|
||||
-- this statement will get error since we don't have a multiple database support for now
|
||||
alter database regression rename to regression2;
|
||||
|
||||
set citus.log_remote_commands = false;
|
|
@ -965,6 +965,11 @@ SELECT (groupid = 0) AS is_coordinator, result FROM run_command_on_all_nodes(
|
|||
JOIN pg_dist_node USING (nodeid)
|
||||
ORDER BY is_coordinator DESC, result;
|
||||
|
||||
set citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%ALTER DATABASE%';
|
||||
alter database regression REFRESH COLLATION VERSION;
|
||||
set citus.log_remote_commands = false;
|
||||
|
||||
-- Clean up
|
||||
\set VERBOSITY terse
|
||||
SET client_min_messages TO ERROR;
|
||||
|
|
Loading…
Reference in New Issue