Adds connection limit

pull/7253/head
gindibay 2023-10-11 17:28:40 +03:00
parent f7f768168b
commit 2d9da93d3d
2 changed files with 64 additions and 27 deletions

View File

@ -184,11 +184,29 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
char *sql = DeparseTreeNode((Node *) stmt);
if (strstr(sql, "SELECT pg_catalog.citus_internal_database_command") != NULL) {
List *workerNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
RowShareLock);
if (list_length(workerNodes) > 0)
{
bool outsideTransaction = false;
List *taskList = CreateDDLTaskList(sql, workerNodes,
outsideTransaction);
bool localExecutionSupported = false;
ExecuteUtilityTaskList(taskList, localExecutionSupported);
return NIL;
}
}else{
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
return NIL;
}
@ -371,6 +389,10 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
set_config_option("synchronous_commit", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
if (IsA(parseTree, CreatedbStmt))
{
CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree);
@ -396,6 +418,20 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
DropDatabase(NULL, (DropdbStmt *) parseTree);
}
}
else if (IsA(parseTree, AlterDatabaseStmt))
{
elog(DEBUG1, "Altering DB");
AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, parseTree);
bool missingOk = false;
Oid databaseOid = get_database_oid(stmt->dbname, missingOk);
if (OidIsValid(databaseOid))
{
AlterDatabase(NULL, (AlterDatabaseStmt *) parseTree,true);
}
}
else
{
ereport(ERROR, (errmsg("unsupported command type %d", nodeTag(parseTree))));

View File

@ -28,7 +28,6 @@
static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt);
static void AppendDefElemConnLimit(StringInfo buf, DefElem *def);
const struct option_format create_database_option_formats[] = {
{ "template", " TEMPLATE %s", "string" },
@ -49,6 +48,13 @@ const struct option_format create_database_option_formats[] = {
{ "oid", " OID %d", "object_id" },
};
const struct option_format alter_database_option_formats[] = {
{ "is_template", " IS_TEMPLATE %s", "boolean" },
{ "allow_connections", " ALLOW_CONNECTIONS %s", "boolean" },
{ "connection_limit", " CONNECTION_LIMIT %d", "integer" },
};
char *
DeparseAlterDatabaseOwnerStmt(Node *node)
{
@ -106,45 +112,40 @@ AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt)
AppendGrantSharedSuffix(buf, stmt);
}
static void
AppendDefElemConnLimit(StringInfo buf, DefElem *def)
{
appendStringInfo(buf, " CONNECTION LIMIT %ld", (long int) defGetNumeric(def));
AppendBasicAlterDatabaseOptions(StringInfo buf,DefElem *def, bool prefix_appended_for_basic_options, char *dbname ){
if(!prefix_appended_for_basic_options){
appendStringInfo(buf, "ALTER DATABASE %s WITH ", quote_identifier(dbname));
prefix_appended_for_basic_options = true;
}
optionToStatement(buf, def, alter_database_option_formats, lengthof(
alter_database_option_formats));
}
static void
AppendAlterDatabaseSetTablespace(StringInfo buf,DefElem *def, char *dbname ){
appendStringInfo(buf,
"SELECT pg_catalog.citus_internal_database_command('ALTER DATABASE %s SET TABLESPACE %s')",
quote_identifier(dbname),quote_identifier(defGetString(def)));
}
static void
AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
{
appendStringInfo(buf, "ALTER DATABASE %s ", quote_identifier(stmt->dbname));
if (stmt->options)
{
ListCell *cell = NULL;
appendStringInfo(buf, "WITH ");
bool prefix_appended_for_basic_options = false;
foreach(cell, stmt->options)
{
DefElem *def = castNode(DefElem, lfirst(cell));
if (strcmp(def->defname, "is_template") == 0)
if (strcmp(def->defname,"tablespace") == 0)
{
appendStringInfo(buf, "IS_TEMPLATE %s",
quote_literal_cstr(strVal(def->arg)));
AppendAlterDatabaseSetTablespace(buf,def,stmt->dbname);
break;
}
else if (strcmp(def->defname, "connection_limit") == 0)
{
AppendDefElemConnLimit(buf, def);
}
else if (strcmp(def->defname, "allow_connections") == 0)
{
ereport(ERROR,
errmsg("ALLOW_CONNECTIONS is not supported"));
}
else
{
ereport(ERROR,
errmsg("unrecognized ALTER DATABASE option: %s",
def->defname));
else{
AppendBasicAlterDatabaseOptions(buf,def,prefix_appended_for_basic_options,stmt->dbname);
}
}
}