From 2d9da93d3d7678058a474ea915e370a8e2e85335 Mon Sep 17 00:00:00 2001 From: gindibay Date: Wed, 11 Oct 2023 17:28:40 +0300 Subject: [PATCH] Adds connection limit --- src/backend/distributed/commands/database.c | 40 ++++++++++++++- .../deparser/deparse_database_stmts.c | 51 ++++++++++--------- 2 files changed, 64 insertions(+), 27 deletions(-) diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 70fa871a0..4ff4c9eec 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -184,11 +184,29 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString, char *sql = DeparseTreeNode((Node *) stmt); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, + if (strstr(sql, "SELECT pg_catalog.citus_internal_database_command") != NULL) { + List *workerNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + RowShareLock); + if (list_length(workerNodes) > 0) + { + bool outsideTransaction = false; + + List *taskList = CreateDDLTaskList(sql, workerNodes, + outsideTransaction); + + bool localExecutionSupported = false; + ExecuteUtilityTaskList(taskList, localExecutionSupported); + return NIL; + } + }else{ + List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + } + return NIL; + } @@ -371,6 +389,10 @@ citus_internal_database_command(PG_FUNCTION_ARGS) (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, GUC_ACTION_LOCAL, true, 0, false); + set_config_option("synchronous_commit", "off", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + if (IsA(parseTree, CreatedbStmt)) { CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree); @@ -396,6 +418,20 @@ citus_internal_database_command(PG_FUNCTION_ARGS) DropDatabase(NULL, (DropdbStmt *) parseTree); } } + else if (IsA(parseTree, AlterDatabaseStmt)) + { + elog(DEBUG1, "Altering DB"); + AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, parseTree); + + bool missingOk = false; + Oid databaseOid = get_database_oid(stmt->dbname, missingOk); + + + if (OidIsValid(databaseOid)) + { + AlterDatabase(NULL, (AlterDatabaseStmt *) parseTree,true); + } + } else { ereport(ERROR, (errmsg("unsupported command type %d", nodeTag(parseTree)))); diff --git a/src/backend/distributed/deparser/deparse_database_stmts.c b/src/backend/distributed/deparser/deparse_database_stmts.c index 5365f072d..b3c7e6c64 100644 --- a/src/backend/distributed/deparser/deparse_database_stmts.c +++ b/src/backend/distributed/deparser/deparse_database_stmts.c @@ -28,7 +28,6 @@ static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt); static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt); -static void AppendDefElemConnLimit(StringInfo buf, DefElem *def); const struct option_format create_database_option_formats[] = { { "template", " TEMPLATE %s", "string" }, @@ -49,6 +48,13 @@ const struct option_format create_database_option_formats[] = { { "oid", " OID %d", "object_id" }, }; + +const struct option_format alter_database_option_formats[] = { + { "is_template", " IS_TEMPLATE %s", "boolean" }, + { "allow_connections", " ALLOW_CONNECTIONS %s", "boolean" }, + { "connection_limit", " CONNECTION_LIMIT %d", "integer" }, +}; + char * DeparseAlterDatabaseOwnerStmt(Node *node) { @@ -106,45 +112,40 @@ AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt) AppendGrantSharedSuffix(buf, stmt); } - static void -AppendDefElemConnLimit(StringInfo buf, DefElem *def) -{ - appendStringInfo(buf, " CONNECTION LIMIT %ld", (long int) defGetNumeric(def)); +AppendBasicAlterDatabaseOptions(StringInfo buf,DefElem *def, bool prefix_appended_for_basic_options, char *dbname ){ + if(!prefix_appended_for_basic_options){ + appendStringInfo(buf, "ALTER DATABASE %s WITH ", quote_identifier(dbname)); + prefix_appended_for_basic_options = true; + } + optionToStatement(buf, def, alter_database_option_formats, lengthof( + alter_database_option_formats)); } +static void +AppendAlterDatabaseSetTablespace(StringInfo buf,DefElem *def, char *dbname ){ + appendStringInfo(buf, + "SELECT pg_catalog.citus_internal_database_command('ALTER DATABASE %s SET TABLESPACE %s')", + quote_identifier(dbname),quote_identifier(defGetString(def))); +} static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt) { - appendStringInfo(buf, "ALTER DATABASE %s ", quote_identifier(stmt->dbname)); - if (stmt->options) { ListCell *cell = NULL; - appendStringInfo(buf, "WITH "); + bool prefix_appended_for_basic_options = false; foreach(cell, stmt->options) { DefElem *def = castNode(DefElem, lfirst(cell)); - if (strcmp(def->defname, "is_template") == 0) + if (strcmp(def->defname,"tablespace") == 0) { - appendStringInfo(buf, "IS_TEMPLATE %s", - quote_literal_cstr(strVal(def->arg))); + AppendAlterDatabaseSetTablespace(buf,def,stmt->dbname); + break; } - else if (strcmp(def->defname, "connection_limit") == 0) - { - AppendDefElemConnLimit(buf, def); - } - else if (strcmp(def->defname, "allow_connections") == 0) - { - ereport(ERROR, - errmsg("ALLOW_CONNECTIONS is not supported")); - } - else - { - ereport(ERROR, - errmsg("unrecognized ALTER DATABASE option: %s", - def->defname)); + else{ + AppendBasicAlterDatabaseOptions(buf,def,prefix_appended_for_basic_options,stmt->dbname); } } }