From 65660db10dc3a587db6bfc1bb6273302b521ffc8 Mon Sep 17 00:00:00 2001 From: gindibay Date: Wed, 8 Nov 2023 02:02:00 +0300 Subject: [PATCH] Fixes review items --- src/backend/distributed/commands/common.c | 4 +- src/backend/distributed/commands/database.c | 199 ++++++++---------- src/backend/distributed/commands/role.c | 1 + .../distributed/commands/utility_hook.c | 32 +-- .../distributed/deparser/citus_deparseutils.c | 48 +++-- .../deparser/deparse_database_stmts.c | 95 +++++++-- src/backend/distributed/metadata/distobject.c | 15 +- .../distributed/metadata/metadata_sync.c | 80 ++++++- src/backend/distributed/shared_library_init.c | 2 +- .../12.2-1.sql | 2 +- .../latest.sql | 2 +- src/include/distributed/commands.h | 12 +- .../distributed/commands/utility_hook.h | 2 +- src/include/distributed/deparser.h | 16 +- src/include/distributed/metadata/distobject.h | 3 +- src/test/regress/citus_tests/run_test.py | 2 - 16 files changed, 310 insertions(+), 205 deletions(-) diff --git a/src/backend/distributed/commands/common.c b/src/backend/distributed/commands/common.c index 9a87df9f1..957e26161 100644 --- a/src/backend/distributed/commands/common.c +++ b/src/backend/distributed/commands/common.c @@ -14,8 +14,10 @@ #include "postgres.h" #include "catalog/objectaddress.h" +#include "catalog/pg_database.h" #include "catalog/pg_ts_config.h" #include "catalog/pg_ts_dict.h" +#include "commands/dbcommands.h" #include "nodes/parsenodes.h" #include "tcop/utility.h" @@ -28,8 +30,6 @@ #include "distributed/metadata/distobject.h" #include "distributed/multi_executor.h" #include "distributed/worker_transaction.h" -#include "catalog/pg_database.h" -#include "commands/dbcommands.h" /* diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index ceac576af..cabe69fd6 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -10,46 +10,53 @@ */ #include "postgres.h" +#include "miscadmin.h" +#include "access/heapam.h" #include "access/htup_details.h" #include "access/xact.h" #include "catalog/objectaddress.h" +#include "catalog/pg_collation.h" #include "catalog/pg_database.h" +#include "catalog/pg_database_d.h" +#include "catalog/pg_tablespace.h" #include "commands/dbcommands.h" -#include "miscadmin.h" #include "nodes/parsenodes.h" -#include "utils/syscache.h" #include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/relcache.h" +#include "utils/syscache.h" +#include "distributed/adaptive_executor.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" +#include "distributed/deparse_shard_query.h" #include "distributed/deparser.h" +#include "distributed/listutils.h" +#include "distributed/metadata/distobject.h" #include "distributed/metadata_sync.h" #include "distributed/metadata_utility.h" #include "distributed/multi_executor.h" #include "distributed/relation_access_tracking.h" -#include "distributed/worker_transaction.h" -#include "distributed/deparser.h" #include "distributed/worker_protocol.h" -#include "distributed/metadata/distobject.h" -#include "distributed/deparse_shard_query.h" -#include "distributed/listutils.h" -#include "distributed/adaptive_executor.h" -#include "access/htup_details.h" -#include "catalog/pg_tablespace.h" -#include "access/heapam.h" -#include "utils/relcache.h" -#include "utils/rel.h" -#include "utils/lsyscache.h" -#include "catalog/pg_collation.h" -#include "utils/relcache.h" -#include "catalog/pg_database_d.h" +#include "distributed/worker_transaction.h" +/* + * DatabaseCollationInfo is used to store collation related information of a database + */ +typedef struct DatabaseCollationInfo +{ + char *collation; + char *ctype; + #if PG_VERSION_NUM >= PG_VERSION_15 + char *icu_locale; + char *collversion; + #endif +} DatabaseCollationInfo; + static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); - - -PG_FUNCTION_INFO_V1(citus_internal_database_command); static Oid get_database_owner(Oid db_oid); List * PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); @@ -264,6 +271,13 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, } +/* + * PostprocessAlterDatabaseStmt is executed before the statement is applied to the local + * postgres instance. + * + * In this stage, we can perform validations and prepare the commands that need to + * be run on all workers to grant. + */ List * PreprocessCreateDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext) @@ -304,82 +318,21 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString) (void *) createDatabaseCommand, ENABLE_DDL_PROPAGATION); - return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands); + return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands); } /* - * citus_internal_database_command is an internal UDF to - * create/drop a database in an idempotent maner without - * transaction block restrictions. + * PostprocessAlterDatabaseStmt is executed after the statement is applied to the local + * postgres instance. In this stage we can prepare the commands that need to be run on + * all workers to drop the database. Since the DROP DATABASE statement gives error in + * transaction context, we need to use NontransactionalNodeDDLTaskList to send the + * DROP DATABASE statement to the workers. */ -Datum -citus_internal_database_command(PG_FUNCTION_ARGS) -{ - int saveNestLevel = NewGUCNestLevel(); - text *commandText = PG_GETARG_TEXT_P(0); - char *command = text_to_cstring(commandText); - Node *parseTree = ParseTreeNode(command); - - set_config_option("citus.enable_ddl_propagation", "off", - (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, - GUC_ACTION_LOCAL, true, 0, false); - - set_config_option("citus.enable_create_database_propagation", "off", - (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, - GUC_ACTION_LOCAL, true, 0, false); - - /* - * createdb() / DropDatabase() uses ParseState to report the error position for the - * input command and the position is reported to be 0 when it's provided as NULL. - * We're okay with that because we don't expect this UDF to be called with an incorrect - * DDL command. - * - */ - ParseState *pstate = NULL; - - if (IsA(parseTree, CreatedbStmt)) - { - CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree); - - bool missingOk = true; - Oid databaseOid = get_database_oid(stmt->dbname, missingOk); - - if (!OidIsValid(databaseOid)) - { - createdb(pstate, (CreatedbStmt *) parseTree); - } - } - else if (IsA(parseTree, DropdbStmt)) - { - DropdbStmt *stmt = castNode(DropdbStmt, parseTree); - - bool missingOk = false; - Oid databaseOid = get_database_oid(stmt->dbname, missingOk); - - - if (OidIsValid(databaseOid)) - { - DropDatabase(pstate, (DropdbStmt *) parseTree); - } - } - else - { - ereport(ERROR, (errmsg("unsupported command type %d", nodeTag(parseTree)))); - } - - /* Below command rollbacks flags to the state before this session*/ - AtEOXact_GUC(true, saveNestLevel); - - PG_RETURN_VOID(); -} - - List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext) { - bool isPostProcess = false; if (!EnableCreateDatabasePropagation || !ShouldPropagate()) { return NIL; @@ -389,41 +342,50 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, DropdbStmt *stmt = (DropdbStmt *) node; + bool isPostProcess = false; List *addresses = GetObjectAddressListFromParseTree(node, stmt->missing_ok, isPostProcess); - if (list_length(addresses) == 0) + if (list_length(addresses) != 1) { - return NIL; + ereport(ERROR, (errmsg("unexpected number of objects found when " + "executing DROP DATABASE command"))); } ObjectAddress *address = (ObjectAddress *) linitial(addresses); - if (address->objectId == InvalidOid || !IsObjectDistributed(address)) + if (address->objectId == InvalidOid || !IsAnyObjectDistributed(list_make1(address))) { return NIL; } char *dropDatabaseCommand = DeparseTreeNode(node); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) dropDatabaseCommand, ENABLE_DDL_PROPAGATION); - return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands); + return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands); } +/* + * GetDatabaseAddressFromDatabaseName gets the database name and returns the ObjectAddress + * of the database. + */ static ObjectAddress * GetDatabaseAddressFromDatabaseName(char *databaseName, bool missingOk) { Oid databaseOid = get_database_oid(databaseName, missingOk); - ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); - ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid); - return dbAddress; + ObjectAddress *dbObjectAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*dbObjectAddress, DatabaseRelationId, databaseOid); + return dbObjectAddress; } +/* + * DropDatabaseStmtObjectAddress gets the ObjectAddress of the database that is the + * object of the DropdbStmt. + */ List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) { @@ -434,6 +396,10 @@ DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) } +/* + * CreateDatabaseStmtObjectAddress gets the ObjectAddress of the database that is the + * object of the CreatedbStmt. + */ List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) { @@ -444,6 +410,9 @@ CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) } +/* + * GetTablespaceName gets the tablespace oid and returns the tablespace name. + */ static char * GetTablespaceName(Oid tablespaceOid) { @@ -462,19 +431,6 @@ GetTablespaceName(Oid tablespaceOid) } -/* - * DatabaseCollationInfo is used to store collation related information of a database - */ -typedef struct DatabaseCollationInfo -{ - char *collation; - char *ctype; - #if PG_VERSION_NUM >= PG_VERSION_15 - char *icu_locale; - char *collversion; - #endif -} DatabaseCollationInfo; - /* * GetDatabaseCollation gets oid of a database and returns all the collation related information * We need this method since collation related info in Form_pg_database is not accessible @@ -548,6 +504,9 @@ GetDatabaseCollation(Oid db_oid) } +/* + * FreeDatabaseCollationInfo frees the memory allocated for DatabaseCollationInfo + */ static void FreeDatabaseCollationInfo(DatabaseCollationInfo collInfo) { @@ -569,8 +528,13 @@ FreeDatabaseCollationInfo(DatabaseCollationInfo collInfo) #if PG_VERSION_NUM >= PG_VERSION_15 + +/* + * GetLocaleProviderString gets the datlocprovider stored in pg_database + * and returns the string representation of the datlocprovider + */ static char * -get_locale_provider_string(char datlocprovider) +GetLocaleProviderString(char datlocprovider) { switch (datlocprovider) { @@ -599,7 +563,8 @@ get_locale_provider_string(char datlocprovider) /* - * GenerateCreateDatabaseStatementFromPgDatabase is gets the pg_database tuple and returns the CREATE DATABASE statement + * GenerateCreateDatabaseStatementFromPgDatabase gets the pg_database tuple and returns the + * CREATE DATABASE statement that can be used to create given database. */ static char * GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm) @@ -642,7 +607,7 @@ GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm) if (databaseForm->datlocprovider != 0) { - appendStringInfo(&str, " LOCALE_PROVIDER = '%s'", get_locale_provider_string( + appendStringInfo(&str, " LOCALE_PROVIDER = '%s'", GetLocaleProviderString( databaseForm->datlocprovider)); } @@ -678,19 +643,21 @@ GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm) /* - * GenerateCreateDatabaseCommandList is gets the pg_database tuples and returns the CREATE DATABASE statement list - * for all the databases in the cluster.citus_internal_database_command UDF is used to send the CREATE DATABASE - * statement to the workers since the CREATE DATABASE statement gives error in transaction context. + * GenerateCreateDatabaseCommandList gets a list of pg_database tuples and returns + * a list of CREATE DATABASE statements for all the databases. + * + * Commands in the list are wrapped by citus_internal_database_command() UDF + * to avoid from transaction block restrictions that apply to database commands */ List * GenerateCreateDatabaseCommandList(void) { List *commands = NIL; - HeapTuple tuple; Relation pgDatabaseRel = table_open(DatabaseRelationId, AccessShareLock); TableScanDesc scan = table_beginscan_catalog(pgDatabaseRel, 0, NULL); + HeapTuple tuple = NULL; while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_database databaseForm = (Form_pg_database) GETSTRUCT(tuple); @@ -702,7 +669,7 @@ GenerateCreateDatabaseCommandList(void) /* Generate the CREATE DATABASE statement */ appendStringInfo(outerDbStmt, - "select pg_catalog.citus_internal_database_command( %s)", + "SELECT pg_catalog.citus_internal_database_command( %s)", quote_literal_cstr( createStmt)); diff --git a/src/backend/distributed/commands/role.c b/src/backend/distributed/commands/role.c index 792efd934..976cacf89 100644 --- a/src/backend/distributed/commands/role.c +++ b/src/backend/distributed/commands/role.c @@ -65,6 +65,7 @@ static DefElem * makeDefElemBool(char *name, bool value); static List * GenerateRoleOptionsList(HeapTuple tuple); static List * GenerateGrantRoleStmtsFromOptions(RoleSpec *roleSpec, List *options); static List * GenerateGrantRoleStmtsOfRole(Oid roleid); +static void EnsureSequentialModeForRoleDDL(void); static char * GetRoleNameFromDbRoleSetting(HeapTuple tuple, TupleDesc DbRoleSettingDescription); diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 0d400d139..150d13676 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -25,7 +25,6 @@ *------------------------------------------------------------------------- */ -#include "distributed/pg_version_constants.h" #include "postgres.h" #include "miscadmin.h" @@ -35,6 +34,7 @@ #include "access/htup_details.h" #include "catalog/catalog.h" #include "catalog/dependency.h" +#include "catalog/pg_database.h" #include "citus_version.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -62,6 +62,7 @@ #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" #include "distributed/multi_physical_planner.h" +#include "distributed/pg_version_constants.h" #include "distributed/reference_table_utils.h" #include "distributed/resource_lock.h" #include "distributed/string_utils.h" @@ -80,7 +81,6 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/syscache.h" -#include "catalog/pg_database.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ @@ -579,7 +579,6 @@ citus_ProcessUtilityInternal(PlannedStmt *pstmt, PreprocessLockStatement((LockStmt *) parsetree, context); } - /* * We only process ALTER TABLE ... ATTACH PARTITION commands in the function below * and distribute the partition if necessary. @@ -726,12 +725,13 @@ citus_ProcessUtilityInternal(PlannedStmt *pstmt, } /* - * Make sure that dropping the role and database deletes the pg_dist_object entries. There is a - * separate logic for roles and database, since roles and database are not included as dropped objects in the - * drop event trigger. To handle it both on worker and coordinator nodes, it is not - * implemented as a part of process functions but here. + * Make sure that dropping node-wide objects deletes the pg_dist_object + * entries. There is a separate logic for node-wide objects (such as role + * and databases), since they are not included as dropped objects in the + * drop event trigger. To handle it both on worker and coordinator nodes, + * it is not implemented as a part of process functions but here. */ - UnmarkRolesAndDatabaseDistributed(parsetree); + UnmarkNodeWideObjectsDistributed(parsetree); pstmt->utilityStmt = parsetree; @@ -1265,10 +1265,12 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) { ereport(WARNING, (errmsg( - "Commands that are not transaction-safe may result in partial failure" - ", potentially leading to an inconsistent state.\nIf the problematic command" - " is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the " - "object,\nif applicable, and then reattempt the original command."))); + "Commands that are not transaction-safe may result in " + "partial failure, potentially leading to an inconsistent " + "state.\nIf the problematic command is a CREATE operation, " + "consider using the 'IF EXISTS' syntax to drop the object," + "\nif applicable, and then re-attempt the original command."))); + PG_RE_THROW(); } } @@ -1483,12 +1485,12 @@ DDLTaskList(Oid relationId, const char *commandString) /* - * NontransactionalNodeDDLTask builds a list of tasks to execute a DDL command on a + * NontransactionalNodeDDLTaskList builds a list of tasks to execute a DDL command on a * given target set of nodes with cannotBeExecutedInTransaction is set to make sure - * that list is being executed without a transaction. + * that task list is executed outside a transaction block. */ List * -NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands) +NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands) { List *ddlJobs = NodeDDLTaskList(targets, commands); DDLJob *ddlJob = NULL; diff --git a/src/backend/distributed/deparser/citus_deparseutils.c b/src/backend/distributed/deparser/citus_deparseutils.c index 6492c14f2..0cfd7dd6f 100644 --- a/src/backend/distributed/deparser/citus_deparseutils.c +++ b/src/backend/distributed/deparser/citus_deparseutils.c @@ -1,56 +1,66 @@ - +/* + * citus_deparseutils.c + * --------------------- + * + * This file contains common functions used for deparsing PostgreSQL statements + * to their equivalent SQL representation. + * + */ #include "postgres.h" -#include "utils/builtins.h" + #include "commands/defrem.h" +#include "distributed/deparser.h" +#include "distributed/pg_version_constants.h" +#include "utils/builtins.h" #include "utils/elog.h" #include "utils/rel.h" #include "utils/relcache.h" #include "utils/syscache.h" #include "utils/typcache.h" -#include "distributed/deparser.h" -#include "distributed/pg_version_constants.h" /** - * Convert a DefElem option to a SQL statement and append it to the given StringInfo buffer. + * DefElemOptionToStatement converts a DefElem option to a SQL statement and + * appends it to the given StringInfo buffer. * * @param buf The StringInfo buffer to append the SQL statement to. * @param option The DefElem option to convert to a SQL statement. - * @param opt_formats The option format specification to use for the conversion. - * @param num_opt_formats The number of option formats in the opt_formats array. + * @param optionFormats The option format specification to use for the conversion. + * @param optionFormatsLen The number of option formats in the opt_formats array. */ void -optionToStatement(StringInfo buf, DefElem *option, const struct - option_format *opt_formats, int - opt_formats_len) +DefElemOptionToStatement(StringInfo buf, DefElem *option, const + DefElemOptionFormat *optionFormats, int + optionFormatsLen) { const char *name = option->defname; int i; - for (i = 0; i < opt_formats_len; i++) + for (i = 0; i < optionFormatsLen; i++) { - if (strcmp(name, opt_formats[i].name) == 0) + if (strcmp(name, optionFormats[i].name) == 0) { - switch (opt_formats[i].type) + switch (optionFormats[i].type) { case OPTION_FORMAT_STRING: { char *value = defGetString(option); - appendStringInfo(buf, opt_formats[i].format, quote_identifier(value)); + appendStringInfo(buf, optionFormats[i].format, quote_identifier( + value)); break; } case OPTION_FORMAT_INTEGER: { int32 value = defGetInt32(option); - appendStringInfo(buf, opt_formats[i].format, value); + appendStringInfo(buf, optionFormats[i].format, value); break; } case OPTION_FORMAT_BOOLEAN: { bool value = defGetBoolean(option); - appendStringInfo(buf, opt_formats[i].format, value ? "true" : + appendStringInfo(buf, optionFormats[i].format, value ? "true" : "false"); break; } @@ -59,7 +69,7 @@ optionToStatement(StringInfo buf, DefElem *option, const struct case OPTION_FORMAT_OBJECT_ID: { Oid value = defGetObjectId(option); - appendStringInfo(buf, opt_formats[i].format, value); + appendStringInfo(buf, optionFormats[i].format, value); break; } @@ -67,14 +77,14 @@ optionToStatement(StringInfo buf, DefElem *option, const struct case OPTION_FORMAT_LITERAL_CSTR: { char *value = defGetString(option); - appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr( + appendStringInfo(buf, optionFormats[i].format, quote_literal_cstr( value)); break; } default: { - elog(ERROR, "unrecognized option type: %d", opt_formats[i].type); + elog(ERROR, "unrecognized option type: %d", optionFormats[i].type); break; } } diff --git a/src/backend/distributed/deparser/deparse_database_stmts.c b/src/backend/distributed/deparser/deparse_database_stmts.c index e286dae65..e96015fdc 100644 --- a/src/backend/distributed/deparser/deparse_database_stmts.c +++ b/src/backend/distributed/deparser/deparse_database_stmts.c @@ -12,25 +12,25 @@ #include "postgres.h" #include "pg_version_compat.h" - #include "catalog/namespace.h" #include "lib/stringinfo.h" #include "nodes/parsenodes.h" #include "utils/builtins.h" -#include "distributed/deparser.h" -#include "distributed/citus_ruleutils.h" #include "commands/defrem.h" #include "distributed/deparser.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/deparser.h" +#include "distributed/listutils.h" #include "distributed/log_utils.h" #include "parser/parse_type.h" -#include "distributed/listutils.h" + static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt); static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt); static void AppendDefElemConnLimit(StringInfo buf, DefElem *def); -const struct option_format create_database_option_formats[] = { +const DefElemOptionFormat create_database_option_formats[] = { { "owner", " OWNER %s", OPTION_FORMAT_STRING }, { "template", " TEMPLATE %s", OPTION_FORMAT_STRING }, { "encoding", " ENCODING %s", OPTION_FORMAT_LITERAL_CSTR }, @@ -49,6 +49,22 @@ const struct option_format create_database_option_formats[] = { { "oid", " OID %d", OPTION_FORMAT_OBJECT_ID } }; +/* + * DeparseAlterDatabaseOwnerStmt + * Deparse an AlterDatabaseOwnerStmt node + * + * This function is responsible for producing a string representation of an + * AlterDatabaseOwnerStmt node, which represents an ALTER DATABASE statement + * that changes the owner of a database. The output string includes the ALTER + * DATABASE keyword, the name of the database being altered, and the new owner + * of the database. + * + * Parameters: + * - node: a pointer to the AlterDatabaseOwnerStmt node to be deparsed + * + * Returns: + * - a string representation of the ALTER DATABASE statement + */ char * DeparseAlterDatabaseOwnerStmt(Node *node) { @@ -64,6 +80,15 @@ DeparseAlterDatabaseOwnerStmt(Node *node) } +/* + * + * AppendAlterDatabaseOwnerStmt + * Append an ALTER DATABASE statement for changing the owner of a database to the given StringInfo buffer. + * + * Parameters: + * - buf: The StringInfo buffer to append the statement to. + * - stmt: The AlterOwnerStmt representing the ALTER DATABASE statement to append. + */ static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt) { @@ -226,6 +251,32 @@ DeparseAlterDatabaseSetStmt(Node *node) } +/* + * Validates for if option is template, lc_type, locale or lc_collate, propagation will + * not be supported since template and strategy options are not stored in the catalog + * and lc_type, locale and lc_collate options depends on template parameter. + */ +static void +ValidateCreateDatabaseOptions(DefElem *option) +{ + if (strcmp(option->defname, "template") == 0 || + strcmp(option->defname, "strategy") == 0 || + strcmp(option->defname, "lc_ctype") == 0 || + strcmp(option->defname, "locale") == 0 || + strcmp(option->defname, "lc_collate") == 0 || + strcmp(option->defname, "icu_locale") == 0 || + strcmp(option->defname, "locale_provider") == 0) + { + ereport(ERROR, + errmsg("CREATE DATABASE option \"%s\" is not supported", + option->defname)); + } +} + + +/* + * Prepares a CREATE DATABASE statement with given empty StringInfo buffer and CreatedbStmt node. + */ static void AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt) { @@ -237,27 +288,18 @@ AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt) foreach_ptr(option, stmt->options) { - /*If option is template, lc_type, locale or lc_collate, propagation will not be supportted */ - /* since template database is not stored in the catalog */ - if (strcmp(option->defname, "template") == 0 || - strcmp(option->defname, "strategy") == 0 || - strcmp(option->defname, "lc_ctype") == 0 || - strcmp(option->defname, "locale") == 0 || - strcmp(option->defname, "lc_collate") == 0 || - strcmp(option->defname, "icu_locale") == 0 || - strcmp(option->defname, "locale_provider") == 0) - { - ereport(ERROR, - errmsg("CREATE DATABASE option \"%s\" is not supported", - option->defname)); - } + ValidateCreateDatabaseOptions(option); - optionToStatement(buf, option, create_database_option_formats, lengthof( - create_database_option_formats)); + DefElemOptionToStatement(buf, option, create_database_option_formats, + lengthof(create_database_option_formats)); } } +/* + * Converts a CreatedbStmt structure into a SQL command string. + * Used in the deparsing of Create database statement. + */ char * DeparseCreateDatabaseStmt(Node *node) { @@ -271,13 +313,16 @@ DeparseCreateDatabaseStmt(Node *node) } +/* + * Prepares a DROP DATABASE statement with given empty StringInfo buffer and DropdbStmt node. + */ static void AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt) { - char *if_exists_statement = stmt->missing_ok ? "IF EXISTS" : ""; + char *ifExistsStatement = stmt->missing_ok ? "IF EXISTS" : ""; appendStringInfo(buf, "DROP DATABASE %s %s", - if_exists_statement, + ifExistsStatement, quote_identifier(stmt->dbname)); DefElem *option = NULL; @@ -298,6 +343,10 @@ AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt) } +/* + * Converts a DropdbStmt structure into a SQL command string. + * Used in the deparsing of drop database statement. + */ char * DeparseDropDatabaseStmt(Node *node) { diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index af8354ee3..1a0b90f2c 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -55,6 +55,7 @@ static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, Datum *paramValues); +static bool IsObjectDistributed(const ObjectAddress *address); PG_FUNCTION_INFO_V1(citus_unmark_object_distributed); PG_FUNCTION_INFO_V1(master_unmark_object_distributed); @@ -358,8 +359,12 @@ ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, } +/* + * Deletes all pg_dist_object records for distributed roles in `DROP ROLE` statement a + * and for all databases in `DROP DATABASE` statement + */ void -UnmarkRolesAndDatabaseDistributed(Node *node) +UnmarkNodeWideObjectsDistributed(Node *node) { if (IsA(node, DropRoleStmt)) { @@ -378,9 +383,9 @@ UnmarkRolesAndDatabaseDistributed(Node *node) char *dbName = stmt->dbname; Oid dbOid = get_database_oid(dbName, stmt->missing_ok); - ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); - ObjectAddressSet(*dbAddress, DatabaseRelationId, dbOid); - UnmarkObjectDistributed(dbAddress); + ObjectAddress *dbObjectAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*dbObjectAddress, DatabaseRelationId, dbOid); + UnmarkObjectDistributed(dbObjectAddress); } } @@ -420,7 +425,7 @@ UnmarkObjectDistributed(const ObjectAddress *address) * IsObjectDistributed returns if the object addressed is already distributed in the * cluster. This performs a local indexed lookup in pg_dist_object. */ -bool +static bool IsObjectDistributed(const ObjectAddress *address) { ScanKeyData key[3]; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 54fa801ae..bc6797504 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -30,12 +30,15 @@ #include "catalog/pg_attrdef.h" #include "catalog/pg_collation.h" #include "catalog/pg_constraint.h" +#include "catalog/pg_database.h" +#include "catalog/pg_database_d.h" #include "catalog/pg_depend.h" #include "catalog/pg_foreign_server.h" #include "catalog/pg_namespace.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "commands/async.h" +#include "commands/dbcommands.h" #include "distributed/argutils.h" #include "distributed/backend_data.h" #include "distributed/citus_ruleutils.h" @@ -179,6 +182,7 @@ PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata); +PG_FUNCTION_INFO_V1(citus_internal_database_command); static bool got_SIGTERM = false; @@ -3895,6 +3899,80 @@ citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS) } +/* + * citus_internal_database_command is an internal UDF to + * create/drop a database in an idempotent maner without + * transaction block restrictions. + */ +Datum +citus_internal_database_command(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + if (!ShouldSkipMetadataChecks()) + { + EnsureCoordinatorInitiatedOperation(); + } + PG_ENSURE_ARGNOTNULL(0, "database command"); + + text *commandText = PG_GETARG_TEXT_P(0); + char *command = text_to_cstring(commandText); + Node *parseTree = ParseTreeNode(command); + + int saveNestLevel = NewGUCNestLevel(); + + set_config_option("citus.enable_ddl_propagation", "off", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + set_config_option("citus.enable_create_database_propagation", "off", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + /* + * createdb() / DropDatabase() uses ParseState to report the error position for the + * input command and the position is reported to be 0 when it's provided as NULL. + * We're okay with that because we don't expect this UDF to be called with an incorrect + * DDL command. + */ + ParseState *pstate = NULL; + + if (IsA(parseTree, CreatedbStmt)) + { + CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree); + + bool missingOk = true; + Oid databaseOid = get_database_oid(stmt->dbname, missingOk); + + if (!OidIsValid(databaseOid)) + { + createdb(pstate, (CreatedbStmt *) parseTree); + } + } + else if (IsA(parseTree, DropdbStmt)) + { + DropdbStmt *stmt = castNode(DropdbStmt, parseTree); + + bool missingOk = false; + Oid databaseOid = get_database_oid(stmt->dbname, missingOk); + + + if (OidIsValid(databaseOid)) + { + DropDatabase(pstate, (DropdbStmt *) parseTree); + } + } + else + { + ereport(ERROR, (errmsg("unsupported command type %d", nodeTag(parseTree)))); + } + + /* Rollbacks GUCs to the state before this session */ + AtEOXact_GUC(true, saveNestLevel); + + PG_RETURN_VOID(); +} + + /* * SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker. */ @@ -4503,7 +4581,7 @@ PropagateNodeWideObjectsCommandList(void) if (EnableCreateDatabasePropagation) { - /* Get commands for database creation */ + /* get commands for database creation */ List *createDatabaseCommands = GenerateCreateDatabaseCommandList(); ddlCommands = list_concat(ddlCommands, createDatabaseCommands); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 32ad4c427..f06e0f2b0 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1266,7 +1266,7 @@ RegisterCitusConfigVariables(void) DefineCustomBoolVariable( "citus.enable_create_database_propagation", gettext_noop("Enables propagating CREATE DATABASE " - "and DROP DATABASE statements to workers"), + "and DROP DATABASE statements to workers."), NULL, &EnableCreateDatabasePropagation, false, diff --git a/src/backend/distributed/sql/udfs/citus_internal_database_command/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_database_command/12.2-1.sql index 232e3ad14..b20f6278e 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_database_command/12.2-1.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_database_command/12.2-1.sql @@ -4,7 +4,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command text) RETURNS void LANGUAGE C - STRICT + VOLATILE AS 'MODULE_PATHNAME', $$citus_internal_database_command$$; COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS 'run a database command without transaction block restrictions'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_database_command/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_database_command/latest.sql index 232e3ad14..b20f6278e 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_database_command/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_database_command/latest.sql @@ -4,7 +4,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command text) RETURNS void LANGUAGE C - STRICT + VOLATILE AS 'MODULE_PATHNAME', $$citus_internal_database_command$$; COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS 'run a database command without transaction block restrictions'; diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 0e43fa386..85c55d39a 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -230,9 +230,6 @@ extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *que ProcessUtilityContext processUtilityContext); -extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool - isPostprocess); - extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); @@ -242,10 +239,10 @@ extern List * PreprocessCreateDatabaseStmt(Node *node, const char *queryString, extern List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString); extern List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); -extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool - isPostprocess); -extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool - isPostprocess); +extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, + bool isPostprocess); +extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, + bool isPostprocess); extern List * GenerateCreateDatabaseCommandList(void); @@ -518,7 +515,6 @@ extern List * RenameRoleStmtObjectAddress(Node *stmt, bool missing_ok, bool extern void UnmarkRolesDistributed(List *roles); extern List * FilterDistributedRoles(List *roles); -extern void EnsureSequentialModeForRoleDDL(void); /* schema.c - forward declarations */ extern List * PostprocessCreateSchemaStmt(Node *node, const char *queryString); diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 1790eb468..5a0b0fed3 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -94,7 +94,7 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString, extern void MarkInvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraphForDDL(void); extern List * DDLTaskList(Oid relationId, const char *commandString); -extern List * NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands); +extern List * NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands); extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands); extern bool AlterTableInProgress(void); extern bool DropSchemaOrDBInProgress(void); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index 66ead2b4d..59e6d40c6 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -123,12 +123,12 @@ extern void AppendGrantSharedSuffix(StringInfo buf, GrantStmt *stmt); /* Common deparser utils */ -struct option_format +typedef struct DefElemOptionFormat { - const char *name; - const char *format; - const int type; -}; + char *name; + char *format; + int type; +} DefElemOptionFormat; typedef enum OptionFormatType { @@ -140,9 +140,9 @@ typedef enum OptionFormatType } OptionFormatType; -extern void optionToStatement(StringInfo buf, DefElem *option, const struct - option_format *opt_formats, int - opt_formats_len); +extern void DefElemOptionToStatement(StringInfo buf, DefElem *option, const + DefElemOptionFormat *opt_formats, int + opt_formats_len); /* forward declarations for deparse_statistics_stmts.c */ diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index 86fada5f7..cf24a8c81 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -21,13 +21,12 @@ extern bool ObjectExists(const ObjectAddress *address); extern bool CitusExtensionObject(const ObjectAddress *objectAddress); extern bool IsAnyObjectDistributed(const List *addresses); -extern bool IsObjectDistributed(const ObjectAddress *address); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); -extern void UnmarkRolesAndDatabaseDistributed(Node *node); +extern void UnmarkNodeWideObjectsDistributed(Node *node); extern bool IsTableOwnedByExtension(Oid relationId); extern bool ObjectAddressDependsOnExtension(const ObjectAddress *target); extern bool IsAnyObjectAddressOwnedByExtension(const List *targets, diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 8f391c444..b28341e5c 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -151,8 +151,6 @@ DEPS = { ], worker_count=6, ), - "create_drop_database_propagation": TestDeps("minimal_schedule"), - "create_drop_database_propagation_pg15": TestDeps("minimal_schedule"), "function_propagation": TestDeps("minimal_schedule"), "citus_shards": TestDeps("minimal_schedule"), "grant_on_foreign_server_propagation": TestDeps("minimal_schedule"),