diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index db0fdb8c4..5bd84fb9c 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -192,6 +192,25 @@ PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString, } +/* + * IsSetTablespaceStatement returns true if the statement is a SET TABLESPACE statement, + * false otherwise. + */ +static bool +IsSetTablespaceStatement(AlterDatabaseStmt *stmt) +{ + DefElem *def = NULL; + foreach_ptr(def, stmt->options) + { + if (strcmp(def->defname, "tablespace") == 0) + { + return true; + } + } + return false; +} + + /* * PreprocessAlterDatabaseStmt is executed before the statement is applied to the local * postgres instance. @@ -203,22 +222,38 @@ List * PreprocessAlterDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext) { - if (!ShouldPropagate()) + bool missingOk = false; + AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node); + ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname, + missingOk); + + if (!ShouldPropagate() || !IsAnyObjectDistributed(list_make1(dbAddress))) { return NIL; } - AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node); - EnsureCoordinator(); char *sql = DeparseTreeNode((Node *) stmt); List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, + sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + if (IsSetTablespaceStatement(stmt)) + { + /* + * Set tablespace does not work inside a transaction.Therefore, we need to use + * NontransactionalNodeDDLTask to run the command on the workers outside + * the transaction block. + */ + + return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands); + } + else + { + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + } } @@ -256,6 +291,36 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString, #endif +/* + * PreprocessAlterDatabaseRenameStmt is executed before the statement is applied to the local + * postgres instance. In this stage we prepare ALTER DATABASE RENAME statement to be run on + * all workers. + */ +List * +PostprocessAlterDatabaseRenameStmt(Node *node, const char *queryString) +{ + bool missingOk = false; + RenameStmt *stmt = castNode(RenameStmt, node); + ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->newname, + missingOk); + + if (!ShouldPropagate() || !IsAnyObjectDistributed(list_make1(dbAddress))) + { + return NIL; + } + + EnsureCoordinator(); + + char *sql = DeparseTreeNode((Node *) stmt); + + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} + + /* * PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local * postgres instance. diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index b7f8c312d..eb454d70d 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -533,6 +533,16 @@ static DistributeObjectOps Database_Set = { .markDistributed = false, }; +static DistributeObjectOps Database_Rename = { + .deparse = DeparseAlterDatabaseRenameStmt, + .qualify = NULL, + .preprocess = NULL, + .postprocess = PostprocessAlterDatabaseRenameStmt, + .objectType = OBJECT_DATABASE, + .operationType = DIST_OPS_ALTER, + .address = NULL, + .markDistributed = false, +}; static DistributeObjectOps Domain_Alter = { .deparse = DeparseAlterDomainStmt, @@ -2103,6 +2113,11 @@ GetDistributeObjectOps(Node *node) return &Collation_Rename; } + case OBJECT_DATABASE: + { + return &Database_Rename; + } + case OBJECT_DOMAIN: { return &Domain_Rename; diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index b76a6d5bf..6d2dd0ba9 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -885,6 +885,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, char *workerPgDistObjectUpdateCommand = MarkObjectsDistributedCreateCommand(objectAddressList, + NIL, distArgumentIndexList, colocationIdList, forceDelegationList); diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index b62dda9ad..9e6b66e3e 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -34,6 +34,7 @@ #include "access/htup_details.h" #include "catalog/catalog.h" #include "catalog/dependency.h" +#include "catalog/pg_authid.h" #include "catalog/pg_database.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -44,6 +45,7 @@ #include "nodes/makefuncs.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" +#include "postmaster/postmaster.h" #include "tcop/utility.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -77,6 +79,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/reference_table_utils.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/string_utils.h" #include "distributed/transaction_management.h" @@ -84,6 +87,13 @@ #include "distributed/worker_shard_visibility.h" #include "distributed/worker_transaction.h" +#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \ + "SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)" +#define START_MANAGEMENT_TRANSACTION \ + "SELECT citus_internal.start_management_transaction('%lu')" +#define MARK_OBJECT_DISTRIBUTED \ + "SELECT citus_internal.mark_object_distributed(%d, %s, %d)" + bool EnableDDLPropagation = true; /* ddl propagation is enabled */ int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE; @@ -112,6 +122,8 @@ static void PostStandardProcessUtility(Node *parsetree); static void DecrementUtilityHookCountersIfNecessary(Node *parsetree); static bool IsDropSchemaOrDB(Node *parsetree); static bool ShouldCheckUndistributeCitusLocalTables(void); +static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString); +static void RunPostprocessMainDBCommand(Node *parsetree); /* * ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of @@ -243,6 +255,11 @@ citus_ProcessUtility(PlannedStmt *pstmt, if (!CitusHasBeenLoaded()) { + if (!IsMainDB) + { + RunPreprocessMainDBCommand(parsetree, queryString); + } + /* * Ensure that utility commands do not behave any differently until CREATE * EXTENSION is invoked. @@ -250,6 +267,11 @@ citus_ProcessUtility(PlannedStmt *pstmt, PrevProcessUtility(pstmt, queryString, false, context, params, queryEnv, dest, completionTag); + if (!IsMainDB) + { + RunPostprocessMainDBCommand(parsetree); + } + return; } else if (IsA(parsetree, CallStmt)) @@ -1572,3 +1594,49 @@ DropSchemaOrDBInProgress(void) { return activeDropSchemaOrDBs > 0; } + + +/* + * RunPreprocessMainDBCommand runs the necessary commands for a query, in main + * database before query is run on the local node with PrevProcessUtility + */ +static void +RunPreprocessMainDBCommand(Node *parsetree, const char *queryString) +{ + if (IsA(parsetree, CreateRoleStmt)) + { + StringInfo mainDBQuery = makeStringInfo(); + appendStringInfo(mainDBQuery, + START_MANAGEMENT_TRANSACTION, + GetCurrentFullTransactionId().value); + RunCitusMainDBQuery(mainDBQuery->data); + mainDBQuery = makeStringInfo(); + appendStringInfo(mainDBQuery, + EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER, + quote_literal_cstr(queryString), + quote_literal_cstr(CurrentUserName())); + RunCitusMainDBQuery(mainDBQuery->data); + } +} + + +/* + * RunPostprocessMainDBCommand runs the necessary commands for a query, in main + * database after query is run on the local node with PrevProcessUtility + */ +static void +RunPostprocessMainDBCommand(Node *parsetree) +{ + if (IsA(parsetree, CreateRoleStmt)) + { + StringInfo mainDBQuery = makeStringInfo(); + CreateRoleStmt *createRoleStmt = castNode(CreateRoleStmt, parsetree); + Oid roleOid = get_role_oid(createRoleStmt->role, false); + appendStringInfo(mainDBQuery, + MARK_OBJECT_DISTRIBUTED, + AuthIdRelationId, + quote_literal_cstr(createRoleStmt->role), + roleOid); + RunCitusMainDBQuery(mainDBQuery->data); + } +} diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index c6a34a9d7..57069f698 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -425,11 +425,13 @@ GetConnParam(const char *keyword) /* * GetEffectiveConnKey checks whether there is any pooler configuration for the - * provided key (host/port combination). The one case where this logic is not - * applied is for loopback connections originating within the task tracker. If - * a corresponding row is found in the poolinfo table, a modified (effective) - * key is returned with the node, port, and dbname overridden, as applicable, - * otherwise, the original key is returned unmodified. + * provided key (host/port combination). If a corresponding row is found in the + * poolinfo table, a modified (effective) key is returned with the node, port, + * and dbname overridden, as applicable, otherwise, the original key is returned + * unmodified. + * + * In the case of Citus non-main databases we just return the key, since we + * would not have access to tables with worker information. */ ConnectionHashKey * GetEffectiveConnKey(ConnectionHashKey *key) @@ -444,7 +446,17 @@ GetEffectiveConnKey(ConnectionHashKey *key) return key; } + if (!CitusHasBeenLoaded()) + { + /* + * This happens when we connect to main database over localhost + * from some non Citus database. + */ + return key; + } + WorkerNode *worker = FindWorkerNode(key->hostname, key->port); + if (worker == NULL) { /* this can be hit when the key references an unknown node */ diff --git a/src/backend/distributed/deparser/deparse_database_stmts.c b/src/backend/distributed/deparser/deparse_database_stmts.c index 3614ba797..30ac3f32c 100644 --- a/src/backend/distributed/deparser/deparse_database_stmts.c +++ b/src/backend/distributed/deparser/deparse_database_stmts.c @@ -30,12 +30,14 @@ static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt); static void AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt); static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt); -static void AppendDefElemConnLimit(StringInfo buf, DefElem *def); static void AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt); static void AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt); static void AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt); +static void AppendBasicAlterDatabaseOptions(StringInfo buf, AlterDatabaseStmt *stmt); +static void AppendGrantDatabases(StringInfo buf, GrantStmt *stmt); +static void AppendAlterDatabaseSetTablespace(StringInfo buf, DefElem *def, char *dbname); -const DefElemOptionFormat create_database_option_formats[] = { +const DefElemOptionFormat createDatabaseOptionFormats[] = { { "owner", " OWNER %s", OPTION_FORMAT_STRING }, { "template", " TEMPLATE %s", OPTION_FORMAT_STRING }, { "encoding", " ENCODING %s", OPTION_FORMAT_LITERAL_CSTR }, @@ -53,6 +55,14 @@ const DefElemOptionFormat create_database_option_formats[] = { { "is_template", " IS_TEMPLATE %s", OPTION_FORMAT_BOOLEAN } }; + +const DefElemOptionFormat alterDatabaseOptionFormats[] = { + { "is_template", " IS_TEMPLATE %s", OPTION_FORMAT_BOOLEAN }, + { "allow_connections", " ALLOW_CONNECTIONS %s", OPTION_FORMAT_BOOLEAN }, + { "connection_limit", " CONNECTION LIMIT %d", OPTION_FORMAT_INTEGER }, +}; + + char * DeparseAlterDatabaseOwnerStmt(Node *node) { @@ -112,48 +122,63 @@ AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt) static void -AppendDefElemConnLimit(StringInfo buf, DefElem *def) +AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt) { - appendStringInfo(buf, " CONNECTION LIMIT %ld", (long int) defGetNumeric(def)); + if (list_length(stmt->options) == 0) + { + elog(ERROR, "got unexpected number of options for ALTER DATABASE"); + } + + if (stmt->options) + { + DefElem *firstOption = linitial(stmt->options); + if (strcmp(firstOption->defname, "tablespace") == 0) + { + AppendAlterDatabaseSetTablespace(buf, firstOption, stmt->dbname); + + /* SET tablespace cannot be combined with other options */ + return; + } + + + appendStringInfo(buf, "ALTER DATABASE %s WITH", + quote_identifier(stmt->dbname)); + + AppendBasicAlterDatabaseOptions(buf, stmt); + } + + appendStringInfo(buf, ";"); } static void -AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt) +AppendAlterDatabaseSetTablespace(StringInfo buf, DefElem *def, char *dbname) { - appendStringInfo(buf, "ALTER DATABASE %s ", quote_identifier(stmt->dbname)); + appendStringInfo(buf, + "ALTER DATABASE %s SET TABLESPACE %s", + quote_identifier(dbname), quote_identifier(defGetString(def))); +} - if (stmt->options) + +/* + * AppendBasicAlterDatabaseOptions appends basic ALTER DATABASE options to a string buffer. + * Basic options are those that can be appended to the ALTER DATABASE statement + * after the "WITH" keyword.(i.e. ALLOW_CONNECTIONS, CONNECTION LIMIT, IS_TEMPLATE) + * For example, the tablespace option is not a basic option since it is defined via SET keyword. + * + * This function takes a string buffer and an AlterDatabaseStmt as input. + * It appends the basic options to the string buffer. + * + */ +static void +AppendBasicAlterDatabaseOptions(StringInfo buf, AlterDatabaseStmt *stmt) +{ + DefElem *def = NULL; + foreach_ptr(def, stmt->options) { - ListCell *cell = NULL; - appendStringInfo(buf, "WITH "); - foreach(cell, stmt->options) - { - DefElem *def = castNode(DefElem, lfirst(cell)); - if (strcmp(def->defname, "is_template") == 0) - { - appendStringInfo(buf, "IS_TEMPLATE %s", - quote_literal_cstr(strVal(def->arg))); - } - else if (strcmp(def->defname, "connection_limit") == 0) - { - AppendDefElemConnLimit(buf, def); - } - else if (strcmp(def->defname, "allow_connections") == 0) - { - ereport(ERROR, - errmsg("ALLOW_CONNECTIONS is not supported")); - } - else - { - ereport(ERROR, - errmsg("unrecognized ALTER DATABASE option: %s", - def->defname)); - } - } + DefElemOptionToStatement(buf, def, alterDatabaseOptionFormats, lengthof( + alterDatabaseOptionFormats)); } - - appendStringInfo(buf, ";"); } @@ -216,6 +241,22 @@ AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt) } +char * +DeparseAlterDatabaseRenameStmt(Node *node) +{ + RenameStmt *stmt = (RenameStmt *) node; + + StringInfoData str; + initStringInfo(&str); + + appendStringInfo(&str, "ALTER DATABASE %s RENAME TO %s", + quote_identifier(stmt->subname), + quote_identifier(stmt->newname)); + + return str.data; +} + + char * DeparseAlterDatabaseSetStmt(Node *node) { @@ -246,8 +287,8 @@ AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt) DefElem *option = NULL; foreach_ptr(option, stmt->options) { - DefElemOptionToStatement(buf, option, create_database_option_formats, - lengthof(create_database_option_formats)); + DefElemOptionToStatement(buf, option, createDatabaseOptionFormats, + lengthof(createDatabaseOptionFormats)); } } diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index 25d976a55..edccd86b9 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -49,18 +49,42 @@ #include "distributed/metadata/pg_dist_object.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/remote_commands.h" #include "distributed/version_compat.h" #include "distributed/worker_transaction.h" -static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); +static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, + char *objectName); static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, Datum *paramValues); static bool IsObjectDistributed(const ObjectAddress *address); +PG_FUNCTION_INFO_V1(mark_object_distributed); PG_FUNCTION_INFO_V1(citus_unmark_object_distributed); PG_FUNCTION_INFO_V1(master_unmark_object_distributed); +/* + * mark_object_distributed adds an object to pg_dist_object + * in all of the nodes. + */ +Datum +mark_object_distributed(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + Oid classId = PG_GETARG_OID(0); + text *objectNameText = PG_GETARG_TEXT_P(1); + char *objectName = text_to_cstring(objectNameText); + Oid objectId = PG_GETARG_OID(2); + ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*objectAddress, classId, objectId); + MarkObjectDistributedWithName(objectAddress, objectName); + PG_RETURN_VOID(); +} + + /* * citus_unmark_object_distributed(classid oid, objid oid, objsubid int) * @@ -160,12 +184,28 @@ ObjectExists(const ObjectAddress *address) void MarkObjectDistributed(const ObjectAddress *distAddress) { + MarkObjectDistributedWithName(distAddress, ""); +} + + +/* + * MarkObjectDistributedWithName marks an object as a distributed object. + * Same as MarkObjectDistributed but this function also allows passing an objectName + * that is used in case the object does not exists for the current transaction. + */ +void +MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName) +{ + if (!CitusHasBeenLoaded()) + { + elog(ERROR, "Cannot mark object distributed because Citus has not been loaded."); + } MarkObjectDistributedLocally(distAddress); if (EnableMetadataSync) { char *workerPgDistObjectUpdateCommand = - CreatePgDistObjectEntryCommand(distAddress); + CreatePgDistObjectEntryCommand(distAddress, objectName); SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand); } } @@ -188,7 +228,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress) if (EnableMetadataSync) { char *workerPgDistObjectUpdateCommand = - CreatePgDistObjectEntryCommand(distAddress); + CreatePgDistObjectEntryCommand(distAddress, ""); SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand); } } @@ -279,17 +319,21 @@ ShouldMarkRelationDistributed(Oid relationId) * for the given object address. */ static char * -CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress) +CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, char *objectName) { /* create a list by adding the address of value to not to have warning */ List *objectAddressList = list_make1((ObjectAddress *) objectAddress); + + /* names also require a list so we create a nested list here */ + List *objectNameList = list_make1(list_make1((char *) objectName)); List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX); List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID); List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN); char *workerPgDistObjectUpdateCommand = MarkObjectsDistributedCreateCommand(objectAddressList, + objectNameList, distArgumetIndexList, colocationIdList, forceDelegationList); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 1b2fa229f..b8983ba21 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -79,6 +79,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_placement.h" #include "distributed/pg_dist_shard.h" +#include "distributed/remote_commands.h" #include "distributed/shardinterval_utils.h" #include "distributed/shared_library_init.h" #include "distributed/utils/array_type.h" @@ -5722,6 +5723,14 @@ GetPoolinfoViaCatalog(int32 nodeId) char * GetAuthinfoViaCatalog(const char *roleName, int64 nodeId) { + /* + * Citus will not be loaded when we run a global DDL command from a + * Citus non-main database. + */ + if (!CitusHasBeenLoaded()) + { + return ""; + } char *authinfo = ""; Datum nodeIdDatumArray[2] = { Int32GetDatum(nodeId), diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f0be1995b..842a45519 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -83,6 +83,7 @@ #include "distributed/pg_dist_shard.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" #include "distributed/resource_lock.h" #include "distributed/tenant_schema_metadata.h" #include "distributed/utils/array_type.h" @@ -900,6 +901,7 @@ NodeListIdempotentInsertCommand(List *workerNodeList) */ char * MarkObjectsDistributedCreateCommand(List *addresses, + List *namesArg, List *distributionArgumentIndexes, List *colocationIds, List *forceDelegations) @@ -924,9 +926,25 @@ MarkObjectsDistributedCreateCommand(List *addresses, int forceDelegation = list_nth_int(forceDelegations, currentObjectCounter); List *names = NIL; List *args = NIL; + char *objectType = NULL; - char *objectType = getObjectTypeDescription(address, false); - getObjectIdentityParts(address, &names, &args, false); + if (IsMainDBCommand) + { + /* + * When we try to distribute an object that's being created in a non Citus + * main database, we cannot find the name, since the object is not visible + * in Citus main database. + * Because of that we need to pass the name to this function. + */ + names = list_nth(namesArg, currentObjectCounter); + bool missingOk = false; + objectType = getObjectTypeDescription(address, missingOk); + } + else + { + objectType = getObjectTypeDescription(address, false); + getObjectIdentityParts(address, &names, &args, IsMainDBCommand); + } if (!isFirstObject) { @@ -5148,6 +5166,7 @@ SendDistObjectCommands(MetadataSyncContext *context) char *workerMetadataUpdateCommand = MarkObjectsDistributedCreateCommand(list_make1(address), + NIL, list_make1_int(distributionArgumentIndex), list_make1_int(colocationId), list_make1_int(forceDelegation)); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index ffb235596..ad5a14a25 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -94,6 +94,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" #include "distributed/repartition_executor.h" #include "distributed/replication_origin_session_utils.h" #include "distributed/resource_lock.h" @@ -2570,6 +2571,17 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NoticeIfSubqueryPushdownEnabled, NULL, NULL); + DefineCustomStringVariable( + "citus.superuser", + gettext_noop("Name of a superuser role to be used in Citus main database " + "connections"), + NULL, + &SuperuserRole, + "", + PGC_SUSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomEnumVariable( "citus.task_assignment_policy", gettext_noop("Sets the policy to use when assigning tasks to worker nodes."), @@ -3149,6 +3161,8 @@ CitusAuthHook(Port *port, int status) */ InitializeBackendData(port->application_name); + IsMainDB = (strncmp(MainDb, "", NAMEDATALEN) == 0 || + strncmp(MainDb, port->database_name, NAMEDATALEN) == 0); /* let other authentication hooks to kick in first */ if (original_client_auth_hook) diff --git a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql index 63c4a527f..2ce2d7a21 100644 --- a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql +++ b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql @@ -3,3 +3,10 @@ #include "udfs/citus_internal_database_command/12.2-1.sql" #include "udfs/citus_add_rebalance_strategy/12.2-1.sql" + +#include "udfs/start_management_transaction/12.2-1.sql" +#include "udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql" +#include "udfs/mark_object_distributed/12.2-1.sql" +#include "udfs/commit_management_command_2pc/12.2-1.sql" + +ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8; diff --git a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql index d18f7257b..0a6f68b06 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql @@ -3,3 +3,20 @@ DROP FUNCTION pg_catalog.citus_internal_database_command(text); #include "../udfs/citus_add_rebalance_strategy/10.1-1.sql" + +DROP FUNCTION citus_internal.start_management_transaction( + outer_xid xid8 +); + +DROP FUNCTION citus_internal.execute_command_on_remote_nodes_as_user( + query text, + username text +); + +DROP FUNCTION citus_internal.mark_object_distributed( + classId Oid, objectName text, objectId Oid +); + +DROP FUNCTION citus_internal.commit_management_command_2pc(); + +ALTER TABLE pg_catalog.pg_dist_transaction DROP COLUMN outer_xid; diff --git a/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.2-1.sql b/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.2-1.sql new file mode 100644 index 000000000..8c24e6dd4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc() + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$; + +COMMENT ON FUNCTION citus_internal.commit_management_command_2pc() + IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit'; diff --git a/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql b/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql new file mode 100644 index 000000000..8c24e6dd4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc() + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$; + +COMMENT ON FUNCTION citus_internal.commit_management_command_2pc() + IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit'; diff --git a/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql new file mode 100644 index 000000000..fc1076e9c --- /dev/null +++ b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$; + +COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + IS 'executes a query on the nodes other than the current one'; diff --git a/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/latest.sql b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/latest.sql new file mode 100644 index 000000000..fc1076e9c --- /dev/null +++ b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$; + +COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + IS 'executes a query on the nodes other than the current one'; diff --git a/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql b/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql new file mode 100644 index 000000000..ee2c5e7e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$mark_object_distributed$$; + +COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql b/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql new file mode 100644 index 000000000..ee2c5e7e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$mark_object_distributed$$; + +COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/sql/udfs/start_management_transaction/12.2-1.sql b/src/backend/distributed/sql/udfs/start_management_transaction/12.2-1.sql new file mode 100644 index 000000000..ec1f416d0 --- /dev/null +++ b/src/backend/distributed/sql/udfs/start_management_transaction/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$start_management_transaction$$; + +COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + IS 'internal Citus function that starts a management transaction in the main database'; diff --git a/src/backend/distributed/sql/udfs/start_management_transaction/latest.sql b/src/backend/distributed/sql/udfs/start_management_transaction/latest.sql new file mode 100644 index 000000000..ec1f416d0 --- /dev/null +++ b/src/backend/distributed/sql/udfs/start_management_transaction/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$start_management_transaction$$; + +COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + IS 'internal Citus function that starts a management transaction in the main database'; diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 3dc89c995..71b6a78dd 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -19,14 +19,19 @@ #include "miscadmin.h" #include "access/xact.h" +#include "postmaster/postmaster.h" #include "utils/builtins.h" #include "utils/hsearch.h" +#include "utils/xid8.h" #include "distributed/backend_data.h" #include "distributed/citus_safe_lib.h" +#include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" +#include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" @@ -56,6 +61,9 @@ static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection static void Assign2PCIdentifier(MultiConnection *connection); +PG_FUNCTION_INFO_V1(start_management_transaction); +PG_FUNCTION_INFO_V1(execute_command_on_remote_nodes_as_user); +PG_FUNCTION_INFO_V1(commit_management_command_2pc); static char *IsolationLevelName[] = { "READ UNCOMMITTED", @@ -64,6 +72,154 @@ static char *IsolationLevelName[] = { "SERIALIZABLE" }; +/* + * These variables are necessary for running queries from a database that is not + * the Citus main database. Some of these queries need to be propagated to the + * workers and Citus main database will be used for these queries, such as + * CREATE ROLE. For that we create a connection to the Citus main database and + * run queries from there. + */ + +/* The MultiConnection used for connecting Citus main database. */ +MultiConnection *MainDBConnection = NULL; + +/* + * IsMainDBCommand is true if this is a query in the Citus main database that is started + * by a query from a different database. + */ +bool IsMainDBCommand = false; + +/* + * The transaction id of the query from the other database that started the + * main database query. + */ +FullTransactionId OuterXid; + +/* + * Shows if this is the Citus main database or not. We needed a variable instead of + * checking if this database's name is the same as MainDb because we sometimes need + * this value outside a transaction where we cannot reach the current database name. + */ +bool IsMainDB = true; + +/* + * Name of a superuser role to be used during main database connections. + */ +char *SuperuserRole = NULL; + + +/* + * start_management_transaction starts a management transaction + * in the main database by recording the outer transaction's transaction id and setting + * IsMainDBCommand to true. + */ +Datum +start_management_transaction(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + OuterXid = PG_GETARG_FULLTRANSACTIONID(0); + IsMainDBCommand = true; + + Use2PCForCoordinatedTransaction(); + + PG_RETURN_VOID(); +} + + +/* + * execute_command_on_remote_nodes_as_user executes the query on the nodes + * other than the current node, using the user passed. + */ +Datum +execute_command_on_remote_nodes_as_user(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + text *queryText = PG_GETARG_TEXT_P(0); + char *query = text_to_cstring(queryText); + + text *usernameText = PG_GETARG_TEXT_P(1); + char *username = text_to_cstring(usernameText); + + StringInfo queryToSend = makeStringInfo(); + + appendStringInfo(queryToSend, "%s;%s;%s", DISABLE_METADATA_SYNC, query, + ENABLE_METADATA_SYNC); + + SendCommandToWorkersAsUser(REMOTE_NODES, username, queryToSend->data); + PG_RETURN_VOID(); +} + + +/* + * commit_management_command_2pc is a wrapper UDF for + * CoordinatedRemoteTransactionsCommit + */ +Datum +commit_management_command_2pc(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + RecoverTwoPhaseCommits(); + + PG_RETURN_VOID(); +} + + +/* + * RunCitusMainDBQuery creates a connection to Citus main database if necessary + * and runs the query over the connection in the main database. + */ +void +RunCitusMainDBQuery(char *query) +{ + if (MainDBConnection == NULL) + { + if (strlen(SuperuserRole) == 0) + { + ereport(ERROR, (errmsg("No superuser role is given for Citus main " + "database connection"), + errhint("Set citus.superuser to a superuser role name"))); + } + int flags = 0; + MainDBConnection = GetNodeUserDatabaseConnection(flags, LocalHostName, + PostPortNumber, + SuperuserRole, + MainDb); + RemoteTransactionBegin(MainDBConnection); + } + + SendRemoteCommand(MainDBConnection, query); + + PGresult *result = GetRemoteCommandResult(MainDBConnection, true); + + if (!IsResponseOK(result)) + { + ReportResultError(MainDBConnection, result, ERROR); + } + + ForgetResults(MainDBConnection); +} + + +/* + * CleanCitusMainDBConnection closes and removes the connection to Citus main database. + */ +void +CleanCitusMainDBConnection(void) +{ + if (MainDBConnection == NULL) + { + return; + } + CloseConnection(MainDBConnection); + MainDBConnection = NULL; +} + /* * StartRemoteTransactionBegin initiates beginning the remote transaction in @@ -616,7 +772,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection) WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port); if (workerNode != NULL) { - LogTransactionRecord(workerNode->groupId, transaction->preparedName); + LogTransactionRecord(workerNode->groupId, transaction->preparedName, OuterXid); } /* diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index e2cfab331..29f5b367e 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,6 +21,7 @@ #include "catalog/dependency.h" #include "common/hashfn.h" #include "nodes/print.h" +#include "postmaster/postmaster.h" #include "storage/fd.h" #include "utils/datum.h" #include "utils/guc.h" @@ -46,6 +47,7 @@ #include "distributed/multi_logical_replication.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" +#include "distributed/remote_commands.h" #include "distributed/repartition_join_execution.h" #include "distributed/replication_origin_session_utils.h" #include "distributed/shard_cleaner.h" @@ -55,6 +57,9 @@ #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" +#define COMMIT_MANAGEMENT_COMMAND_2PC \ + "SELECT citus_internal.commit_management_command_2pc()" + CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; @@ -317,12 +322,23 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) MemoryContext previousContext = MemoryContextSwitchTo(CitusXactCallbackContext); - if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) + if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED && + !IsMainDBCommand) { /* handles both already prepared and open transactions */ CoordinatedRemoteTransactionsCommit(); } + /* + * If this is a non-Citus main database we should try to commit the prepared + * transactions created by the Citus main database on the worker nodes. + */ + if (!IsMainDB && MainDBConnection != NULL) + { + RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC); + CleanCitusMainDBConnection(); + } + /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { @@ -378,6 +394,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) RemoveIntermediateResultsDirectories(); + CleanCitusMainDBConnection(); + /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) { @@ -509,6 +527,17 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) break; } + + /* + * If this is a non-Citus main database we should commit the Citus + * main database query. So if some error happens on the distributed main + * database query we wouldn't have committed the current query. + */ + if (!IsMainDB && MainDBConnection != NULL) + { + RunCitusMainDBQuery("COMMIT"); + } + /* * TODO: It'd probably be a good idea to force constraints and * such to 'immediate' here. Deferred triggers might try to send @@ -537,7 +566,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * us to mark failed placements as invalid. Better don't use * this for anything important (i.e. DDL/metadata). */ - CoordinatedRemoteTransactionsCommit(); + if (IsMainDB) + { + CoordinatedRemoteTransactionsCommit(); + } CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; } diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 0ec5ba0a3..653b962db 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -29,10 +29,12 @@ #include "lib/stringinfo.h" #include "storage/lmgr.h" #include "storage/lock.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/xid8.h" #include "pg_version_constants.h" @@ -82,7 +84,7 @@ recover_prepared_transactions(PG_FUNCTION_ARGS) * prepared transaction should be committed. */ void -LogTransactionRecord(int32 groupId, char *transactionName) +LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId outerXid) { Datum values[Natts_pg_dist_transaction]; bool isNulls[Natts_pg_dist_transaction]; @@ -93,6 +95,7 @@ LogTransactionRecord(int32 groupId, char *transactionName) values[Anum_pg_dist_transaction_groupid - 1] = Int32GetDatum(groupId); values[Anum_pg_dist_transaction_gid - 1] = CStringGetTextDatum(transactionName); + values[Anum_pg_dist_transaction_outerxid - 1] = FullTransactionIdGetDatum(outerXid); /* open transaction relation and insert new tuple */ Relation pgDistTransaction = table_open(DistTransactionRelationId(), @@ -258,6 +261,54 @@ RecoverWorkerTransactions(WorkerNode *workerNode) continue; } + /* Check if the transaction is created by an outer transaction from a non-main database */ + bool outerXidIsNull = false; + Datum outerXidDatum = heap_getattr(heapTuple, + Anum_pg_dist_transaction_outerxid, + tupleDescriptor, &outerXidIsNull); + + TransactionId outerXid = 0; + if (!outerXidIsNull) + { + FullTransactionId outerFullXid = DatumGetFullTransactionId(outerXidDatum); + outerXid = XidFromFullTransactionId(outerFullXid); + } + + if (outerXid != 0) + { + bool outerXactIsInProgress = TransactionIdIsInProgress(outerXid); + bool outerXactDidCommit = TransactionIdDidCommit(outerXid); + if (outerXactIsInProgress && !outerXactDidCommit) + { + /* + * The transaction is initiated from an outer transaction and the outer + * transaction is not yet committed, so we should not commit either. + * We remove this transaction from the pendingTransactionSet so it'll + * not be aborted by the loop below. + */ + hash_search(pendingTransactionSet, transactionName, HASH_REMOVE, + &foundPreparedTransactionBeforeCommit); + continue; + } + else if (!outerXactIsInProgress && !outerXactDidCommit) + { + /* + * Since outer transaction isn't in progress and did not commit we need to + * abort the prepared transaction too. We do this by simply doing the same + * thing we would do for transactions that are initiated from the main + * database. + */ + continue; + } + else + { + /* + * Outer transaction did commit, so we can try to commit the prepared + * transaction too. + */ + } + } + /* * Remove the transaction from the pending list such that only transactions * that need to be aborted remain at the end. diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 2af1e9a6c..9c8563de0 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -234,7 +234,8 @@ List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { List *workerNodeList = NIL; - if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES) + if (targetWorkerSet == ALL_SHARD_NODES || + targetWorkerSet == METADATA_NODES) { workerNodeList = ActivePrimaryNodeList(lockMode); } diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 3c536454c..99bf81843 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -244,6 +244,8 @@ extern List * DropDatabaseStmtObjectAddress(Node *node, bool missingOk, bool isPostprocess); extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missingOk, bool isPostprocess); +extern List * GenerateGrantDatabaseCommandList(void); +extern List * PostprocessAlterDatabaseRenameStmt(Node *node, const char *queryString); extern void EnsureSupportedCreateDatabaseCommand(CreatedbStmt *stmt); extern char * CreateDatabaseDDLCommand(Oid dbId); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index 9177eb951..22636b401 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -252,6 +252,7 @@ extern char * DeparseAlterDatabaseRefreshCollStmt(Node *node); extern char * DeparseAlterDatabaseSetStmt(Node *node); extern char * DeparseCreateDatabaseStmt(Node *node); extern char * DeparseDropDatabaseStmt(Node *node); +extern char * DeparseAlterDatabaseRenameStmt(Node *node); /* forward declaration for deparse_publication_stmts.c */ diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index cf24a8c81..bbbbdf9da 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress); extern bool IsAnyObjectDistributed(const List *addresses); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); +extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index e20c44535..9f4c0a24b 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -89,6 +89,7 @@ extern List * NodeMetadataCreateCommands(void); extern List * CitusTableMetadataCreateCommandList(Oid relationId); extern List * NodeMetadataDropCommands(void); extern char * MarkObjectsDistributedCreateCommand(List *addresses, + List *names, List *distributionArgumentIndexes, List *colocationIds, List *forceDelegations); diff --git a/src/include/distributed/pg_dist_transaction.h b/src/include/distributed/pg_dist_transaction.h index 815633b70..95658f782 100644 --- a/src/include/distributed/pg_dist_transaction.h +++ b/src/include/distributed/pg_dist_transaction.h @@ -35,9 +35,10 @@ typedef FormData_pg_dist_transaction *Form_pg_dist_transaction; * compiler constants for pg_dist_transaction * ---------------- */ -#define Natts_pg_dist_transaction 2 +#define Natts_pg_dist_transaction 3 #define Anum_pg_dist_transaction_groupid 1 #define Anum_pg_dist_transaction_gid 2 +#define Anum_pg_dist_transaction_outerxid 3 #endif /* PG_DIST_TRANSACTION_H */ diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index 1c422da20..2b61c25bd 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -144,4 +144,13 @@ extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId); extern void CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId); extern void CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId); +extern void RunCitusMainDBQuery(char *query); +extern void CleanCitusMainDBConnection(void); + +extern bool IsMainDBCommand; +extern bool IsMainDB; +extern char *SuperuserRole; +extern char *MainDb; +extern struct MultiConnection *MainDBConnection; + #endif /* REMOTE_TRANSACTION_H */ diff --git a/src/include/distributed/transaction_recovery.h b/src/include/distributed/transaction_recovery.h index 811dbb949..a4073875a 100644 --- a/src/include/distributed/transaction_recovery.h +++ b/src/include/distributed/transaction_recovery.h @@ -17,7 +17,8 @@ extern int Recover2PCInterval; /* Functions declarations for worker transactions */ -extern void LogTransactionRecord(int32 groupId, char *transactionName); +extern void LogTransactionRecord(int32 groupId, char *transactionName, + FullTransactionId outerXid); extern int RecoverTwoPhaseCommits(void); extern void DeleteWorkerTransactions(WorkerNode *workerNode); diff --git a/src/test/regress/citus_tests/test/test_other_databases.py b/src/test/regress/citus_tests/test/test_other_databases.py new file mode 100644 index 000000000..cf824f926 --- /dev/null +++ b/src/test/regress/citus_tests/test/test_other_databases.py @@ -0,0 +1,154 @@ +def test_main_commited_outer_not_yet(cluster): + c = cluster.coordinator + w0 = cluster.workers[0] + + # create a non-main database + c.sql("CREATE DATABASE db1") + + # we will use cur1 to simulate non-main database user and + # cur2 to manually do the steps we would do in the main database + with c.cur(dbname="db1") as cur1, c.cur() as cur2: + # let's start a transaction and find its transaction id + cur1.execute("BEGIN") + cur1.execute("SELECT txid_current()") + txid = cur1.fetchall() + + # using the transaction id of the cur1 simulate the main database commands manually + cur2.execute("BEGIN") + cur2.execute( + "SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),) + ) + cur2.execute( + "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')" + ) + cur2.execute( + "SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)" + ) + cur2.execute("COMMIT") + + # run the transaction recovery + c.sql("SELECT recover_prepared_transactions()") + + # user should not be created on the worker because outer transaction is not committed yet + role_before_commit = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u1'" + ) + + assert ( + int(role_before_commit) == 0 + ), "role is on pg_dist_object despite not committing" + + # user should not be in pg_dist_object on the worker because outer transaction is not committed yet + pdo_before_commit = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" + ) + + assert int(pdo_before_commit) == 0, "role is created despite not committing" + + # commit in cur1 so the transaction recovery thinks this is a successful transaction + cur1.execute("COMMIT") + + # run the transaction recovery again after committing + c.sql("SELECT recover_prepared_transactions()") + + # check that the user is created by the transaction recovery on the worker + role_after_commit = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u1'" + ) + + assert ( + int(role_after_commit) == 1 + ), "role is not created during recovery despite committing" + + # check that the user is on pg_dist_object on the worker after transaction recovery + pdo_after_commit = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" + ) + + assert ( + int(pdo_after_commit) == 1 + ), "role is not on pg_dist_object after recovery despite committing" + + c.sql("DROP DATABASE db1") + c.sql( + "SELECT citus_internal.execute_command_on_remote_nodes_as_user('DROP USER u1', 'postgres')" + ) + c.sql( + """ + SELECT run_command_on_workers($$ + DELETE FROM pg_dist_object + WHERE objid::regrole::text = 'u1' + $$) + """ + ) + + +def test_main_commited_outer_aborted(cluster): + c = cluster.coordinator + w0 = cluster.workers[0] + + # create a non-main database + c.sql("CREATE DATABASE db2") + + # we will use cur1 to simulate non-main database user and + # cur2 to manually do the steps we would do in the main database + with c.cur(dbname="db2") as cur1, c.cur() as cur2: + # let's start a transaction and find its transaction id + cur1.execute("BEGIN") + cur1.execute("SELECT txid_current()") + txid = cur1.fetchall() + + # using the transaction id of the cur1 simulate the main database commands manually + cur2.execute("BEGIN") + cur2.execute( + "SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),) + ) + cur2.execute( + "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u2;', 'postgres')" + ) + cur2.execute( + "SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321)" + ) + cur2.execute("COMMIT") + + # abort cur1 so the transaction recovery thinks this is an aborted transaction + cur1.execute("ABORT") + + # check that the user is not yet created on the worker + role_before_recovery = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u2'" + ) + + assert int(role_before_recovery) == 0, "role is already created before recovery" + + # check that the user is not on pg_dist_object on the worker + pdo_before_recovery = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'" + ) + + assert ( + int(pdo_before_recovery) == 0 + ), "role is already on pg_dist_object before recovery" + + # run the transaction recovery + c.sql("SELECT recover_prepared_transactions()") + + # check that the user is not created by the transaction recovery on the worker + role_after_recovery = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u2'" + ) + + assert ( + int(role_after_recovery) == 0 + ), "role is created during recovery despite aborting" + + # check that the user is not on pg_dist_object on the worker after transaction recovery + pdo_after_recovery = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'" + ) + + assert ( + int(pdo_after_recovery) == 0 + ), "role is on pg_dist_object after recovery despite aborting" + + c.sql("DROP DATABASE db2") diff --git a/src/test/regress/expected/alter_database_propagation.out b/src/test/regress/expected/alter_database_propagation.out index 0ce217749..1a56f1338 100644 --- a/src/test/regress/expected/alter_database_propagation.out +++ b/src/test/regress/expected/alter_database_propagation.out @@ -1,38 +1,30 @@ set citus.log_remote_commands = true; set citus.grep_remote_commands = '%ALTER DATABASE%'; --- since ALLOW_CONNECTIONS alter option should be executed in a different database --- and since we don't have a multiple database support for now, --- this statement will get error -alter database regression ALLOW_CONNECTIONS false; -ERROR: ALLOW_CONNECTIONS is not supported alter database regression with CONNECTION LIMIT 100; -NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100; +NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100; +NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx alter database regression with IS_TEMPLATE true CONNECTION LIMIT 50; -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true' CONNECTION LIMIT 50; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true CONNECTION LIMIT 50; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true' CONNECTION LIMIT 50; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true CONNECTION LIMIT 50; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx alter database regression with CONNECTION LIMIT -1; -NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1; +NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1; +NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx alter database regression with IS_TEMPLATE true; -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true'; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true'; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx alter database regression with IS_TEMPLATE false; -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'false'; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE false; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'false'; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE false; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx --- this statement will get error since we don't have a multiple database support for now -alter database regression rename to regression2; -ERROR: current database cannot be renamed alter database regression set default_transaction_read_only = true; NOTICE: issuing ALTER DATABASE regression SET default_transaction_read_only = 'true' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -147,4 +139,86 @@ NOTICE: issuing ALTER DATABASE regression RESET lock_timeout DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing ALTER DATABASE regression RESET lock_timeout DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +set citus.enable_create_database_propagation=on; +create database "regression!'2"; +alter database "regression!'2" with CONNECTION LIMIT 100; +NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50; +NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE true CONNECTION LIMIT 50; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE true CONNECTION LIMIT 50; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +alter database "regression!'2" with IS_TEMPLATE false; +NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE false; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE false; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts3' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; +\c - - - :worker_1_port +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts4' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; +\c - - - :worker_2_port +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts5' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; +\c - - - :master_port +set citus.log_remote_commands = true; +set citus.grep_remote_commands = '%ALTER DATABASE%'; +alter database "regression!'2" set TABLESPACE alter_db_tablespace; +NOTICE: issuing ALTER DATABASE "regression!'2" SET TABLESPACE alter_db_tablespace +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'2" SET TABLESPACE alter_db_tablespace +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +set citus.enable_create_database_propagation=on; +alter database "regression!'2" rename to regression3; +NOTICE: issuing ALTER DATABASE "regression!'2" RENAME TO regression3 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'2" RENAME TO regression3 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +-- check that the local database rename and alter comnmand is not propagated +set citus.enable_create_database_propagation=off; +CREATE database local_regression; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +alter DATABASE local_regression with CONNECTION LIMIT 100; +alter DATABASE local_regression rename to local_regression2; +drop database local_regression2; +set citus.enable_create_database_propagation=on; +drop database regression3; +create database "regression!'4"; +SELECT result FROM run_command_on_all_nodes( + $$ + ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape" + $$ +); + result +--------------------------------------------------------------------- + ALTER TABLESPACE + ALTER TABLESPACE + ALTER TABLESPACE +(3 rows) + +alter database "regression!'4" set TABLESPACE "ts-needs\!escape"; +NOTICE: issuing ALTER DATABASE "regression!'4" SET TABLESPACE "ts-needs\!escape" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'4" SET TABLESPACE "ts-needs\!escape" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +drop database "regression!'4"; set citus.log_remote_commands = false; +set citus.enable_create_database_propagation=off; +SELECT result FROM run_command_on_all_nodes( + $$ + drop tablespace "ts-needs\!escape" + $$ +); + result +--------------------------------------------------------------------- + DROP TABLESPACE + DROP TABLESPACE + DROP TABLESPACE +(3 rows) + diff --git a/src/test/regress/expected/create_drop_database_propagation.out b/src/test/regress/expected/create_drop_database_propagation.out index e0172f3e8..eb637f8c2 100644 --- a/src/test/regress/expected/create_drop_database_propagation.out +++ b/src/test/regress/expected/create_drop_database_propagation.out @@ -209,19 +209,7 @@ SELECT result FROM run_command_on_all_nodes( CREATE USER "role-needs\!escape"; CREATE DATABASE "db-needs\!escape" owner "role-needs\!escape" tablespace "ts-needs\!escape"; -- Rename it to make check_database_on_all_nodes happy. --- Today we don't support ALTER DATABASE .. RENAME TO .., so need to propagate it manually. -SELECT result FROM run_command_on_all_nodes( - $$ - ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape - $$ -); - result ---------------------------------------------------------------------- - ALTER DATABASE - ALTER DATABASE - ALTER DATABASE -(3 rows) - +ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape; SELECT * FROM public.check_database_on_all_nodes('db_needs_escape') ORDER BY node_type; node_type | result --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_non_main_db_2pc.out b/src/test/regress/expected/failure_non_main_db_2pc.out new file mode 100644 index 000000000..1e8558136 --- /dev/null +++ b/src/test/regress/expected/failure_non_main_db_2pc.out @@ -0,0 +1,154 @@ +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CREATE SCHEMA failure_non_main_db_2pc; +SET SEARCH_PATH TO 'failure_non_main_db_2pc'; +CREATE DATABASE other_db1; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +\c other_db1 +CREATE USER user_1; +\c regression +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | user_1 + 1 | user_1 + 2 | +(3 rows) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | user_1 + 1 | user_1 + 2 | user_1 +(3 rows) + +SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +\c other_db1 +CREATE USER user_2; +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx +while executing command on localhost:xxxxx +\c regression +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | + 1 | + 2 | +(3 rows) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | + 1 | + 2 | +(3 rows) + +DROP DATABASE other_db1; +-- user_2 should not exist because the query to create it will fail +-- but let's make sure we try to drop it just in case +DROP USER IF EXISTS user_1, user_2; +NOTICE: role "user_2" does not exist, skipping +SELECT citus_set_coordinator_host('localhost'); + citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +CREATE DATABASE other_db2; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +\c other_db2 +CREATE USER user_3; +\c regression +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + result +--------------------------------------------------------------------- + + user_3 + user_3 +(3 rows) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + result +--------------------------------------------------------------------- + user_3 + user_3 + user_3 +(3 rows) + +DROP DATABASE other_db2; +DROP USER user_3; +\c - - - :master_port +SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$); + result +--------------------------------------------------------------------- + DELETE 1 + DELETE 1 + DELETE 1 +(3 rows) + +DROP SCHEMA failure_non_main_db_2pc; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 43f9c3b98..b9f489a1f 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1420,10 +1420,14 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 12.2-1 ALTER EXTENSION citus UPDATE TO '12.2-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- + | function citus_internal.commit_management_command_2pc() void + | function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void + | function citus_internal.mark_object_distributed(oid,text,oid) void + | function citus_internal.start_management_transaction(xid8) void | function citus_internal_database_command(text) void -(1 row) +(5 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index 20cec7578..0a29a22af 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -64,7 +64,7 @@ SELECT recover_prepared_transactions(); (1 row) -- delete the citus_122_should_do_nothing transaction -DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *; +DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid; groupid | gid --------------------------------------------------------------------- 122 | citus_122_should_do_nothing diff --git a/src/test/regress/expected/other_databases.out b/src/test/regress/expected/other_databases.out new file mode 100644 index 000000000..67d7dad3f --- /dev/null +++ b/src/test/regress/expected/other_databases.out @@ -0,0 +1,130 @@ +CREATE SCHEMA other_databases; +SET search_path TO other_databases; +SET citus.next_shard_id TO 10231023; +CREATE DATABASE other_db1; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c other_db1 +SHOW citus.main_db; + citus.main_db +--------------------------------------------------------------------- + regression +(1 row) + +-- check that empty citus.superuser gives error +SET citus.superuser TO ''; +CREATE USER empty_superuser; +ERROR: No superuser role is given for Citus main database connection +HINT: Set citus.superuser to a superuser role name +SET citus.superuser TO 'postgres'; +CREATE USER other_db_user1; +CREATE USER other_db_user2; +BEGIN; +CREATE USER other_db_user3; +CREATE USER other_db_user4; +COMMIT; +BEGIN; +CREATE USER other_db_user5; +CREATE USER other_db_user6; +ROLLBACK; +BEGIN; +CREATE USER other_db_user7; +SELECT 1/0; +ERROR: division by zero +COMMIT; +CREATE USER other_db_user8; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user1 + other_db_user2 + other_db_user3 + other_db_user4 + other_db_user8 +(5 rows) + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user1 + other_db_user2 + other_db_user3 + other_db_user4 + other_db_user8 +(5 rows) + +\c - - - :master_port +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8; +NOTICE: role "other_db_user5" does not exist, skipping +NOTICE: role "other_db_user6" does not exist, skipping +NOTICE: role "other_db_user7" does not exist, skipping +-- Make sure non-superuser roles cannot use internal GUCs +-- but they can still create a role +CREATE USER nonsuperuser CREATEROLE; +GRANT ALL ON SCHEMA citus_internal TO nonsuperuser; +SET ROLE nonsuperuser; +SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres'); +ERROR: operation is not allowed +HINT: Run the command with a superuser. +\c other_db1 +CREATE USER other_db_user9; +RESET ROLE; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user9 +(1 row) + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user9 +(1 row) + +\c - - - :master_port +REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser; +DROP USER other_db_user9, nonsuperuser; +-- test from a worker +\c - - - :worker_1_port +CREATE DATABASE other_db2; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c other_db2 +CREATE USER worker_user1; +BEGIN; +CREATE USER worker_user2; +COMMIT; +BEGIN; +CREATE USER worker_user3; +ROLLBACK; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + worker_user1 + worker_user2 +(2 rows) + +\c - - - :master_port +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + worker_user1 + worker_user2 +(2 rows) + +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS worker_user1, worker_user2, worker_user3; +NOTICE: role "worker_user3" does not exist, skipping +\c - - - :worker_1_port +DROP DATABASE other_db2; +\c - - - :master_port +DROP SCHEMA other_databases; +DROP DATABASE other_db1; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 942e0336f..6d41ac058 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -56,13 +56,17 @@ ORDER BY 1; function citus_get_active_worker_nodes() function citus_get_node_clock() function citus_get_transaction_clock() + function citus_internal.commit_management_command_2pc() + function citus_internal.execute_command_on_remote_nodes_as_user(text,text) function citus_internal.find_groupid_for_node(text,integer) + function citus_internal.mark_object_distributed(oid,text,oid) function citus_internal.pg_dist_node_trigger_func() function citus_internal.pg_dist_rebalance_strategy_trigger_func() function citus_internal.pg_dist_shard_placement_trigger_func() function citus_internal.refresh_isolation_tester_prepared_statement() function citus_internal.replace_isolation_tester_func() function citus_internal.restore_isolation_tester_func() + function citus_internal.start_management_transaction(xid8) function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid) function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char") @@ -344,5 +348,5 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(334 rows) +(338 rows) diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index afc4780bf..e1ad362b5 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -34,6 +34,7 @@ test: failure_multi_row_insert test: failure_mx_metadata_sync test: failure_mx_metadata_sync_multi_trans test: failure_connection_establishment +test: failure_non_main_db_2pc # this test syncs metadata to the workers test: failure_failover_to_local_execution diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 866b07f5f..5c9d8a45c 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -108,6 +108,7 @@ test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes test: background_task_queue_monitor +test: other_databases # Causal clock test test: clock diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 95330c638..c9a85d523 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -490,6 +490,8 @@ push(@pgOptions, "citus.stat_statements_track = 'all'"); push(@pgOptions, "citus.enable_change_data_capture=on"); push(@pgOptions, "citus.stat_tenants_limit = 2"); push(@pgOptions, "citus.stat_tenants_track = 'ALL'"); +push(@pgOptions, "citus.main_db = 'regression'"); +push(@pgOptions, "citus.superuser = 'postgres'"); # Some tests look at shards in pg_class, make sure we can usually see them: push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); diff --git a/src/test/regress/sql/alter_database_propagation.sql b/src/test/regress/sql/alter_database_propagation.sql index 2b9d3ac33..4904919a6 100644 --- a/src/test/regress/sql/alter_database_propagation.sql +++ b/src/test/regress/sql/alter_database_propagation.sql @@ -1,20 +1,12 @@ set citus.log_remote_commands = true; set citus.grep_remote_commands = '%ALTER DATABASE%'; - --- since ALLOW_CONNECTIONS alter option should be executed in a different database --- and since we don't have a multiple database support for now, --- this statement will get error -alter database regression ALLOW_CONNECTIONS false; - - alter database regression with CONNECTION LIMIT 100; alter database regression with IS_TEMPLATE true CONNECTION LIMIT 50; alter database regression with CONNECTION LIMIT -1; alter database regression with IS_TEMPLATE true; alter database regression with IS_TEMPLATE false; --- this statement will get error since we don't have a multiple database support for now -alter database regression rename to regression2; + alter database regression set default_transaction_read_only = true; @@ -56,4 +48,66 @@ alter database regression set lock_timeout from current; alter database regression set lock_timeout to DEFAULT; alter database regression RESET lock_timeout; +set citus.enable_create_database_propagation=on; +create database "regression!'2"; +alter database "regression!'2" with CONNECTION LIMIT 100; +alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50; +alter database "regression!'2" with IS_TEMPLATE false; + + + + +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts3' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; + +\c - - - :worker_1_port +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts4' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; + +\c - - - :worker_2_port +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts5' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; + +\c - - - :master_port + +set citus.log_remote_commands = true; +set citus.grep_remote_commands = '%ALTER DATABASE%'; + +alter database "regression!'2" set TABLESPACE alter_db_tablespace; + +set citus.enable_create_database_propagation=on; +alter database "regression!'2" rename to regression3; + +-- check that the local database rename and alter comnmand is not propagated +set citus.enable_create_database_propagation=off; +CREATE database local_regression; + +alter DATABASE local_regression with CONNECTION LIMIT 100; +alter DATABASE local_regression rename to local_regression2; +drop database local_regression2; + +set citus.enable_create_database_propagation=on; + +drop database regression3; + +create database "regression!'4"; + + +SELECT result FROM run_command_on_all_nodes( + $$ + ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape" + $$ +); + +alter database "regression!'4" set TABLESPACE "ts-needs\!escape"; + +drop database "regression!'4"; + set citus.log_remote_commands = false; +set citus.enable_create_database_propagation=off; + +SELECT result FROM run_command_on_all_nodes( + $$ + drop tablespace "ts-needs\!escape" + $$ +); diff --git a/src/test/regress/sql/create_drop_database_propagation.sql b/src/test/regress/sql/create_drop_database_propagation.sql index c83548d68..c71841eee 100644 --- a/src/test/regress/sql/create_drop_database_propagation.sql +++ b/src/test/regress/sql/create_drop_database_propagation.sql @@ -129,13 +129,8 @@ CREATE USER "role-needs\!escape"; CREATE DATABASE "db-needs\!escape" owner "role-needs\!escape" tablespace "ts-needs\!escape"; -- Rename it to make check_database_on_all_nodes happy. --- Today we don't support ALTER DATABASE .. RENAME TO .., so need to propagate it manually. -SELECT result FROM run_command_on_all_nodes( - $$ - ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape - $$ -); +ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape; SELECT * FROM public.check_database_on_all_nodes('db_needs_escape') ORDER BY node_type; -- test database syncing after node addition @@ -541,6 +536,7 @@ REVOKE CONNECT ON DATABASE test_db FROM propagated_role; DROP DATABASE test_db; DROP ROLE propagated_role, non_propagated_role; + --clean up resources created by this test -- DROP TABLESPACE is not supported, so we need to drop it manually. diff --git a/src/test/regress/sql/failure_non_main_db_2pc.sql b/src/test/regress/sql/failure_non_main_db_2pc.sql new file mode 100644 index 000000000..74061ae34 --- /dev/null +++ b/src/test/regress/sql/failure_non_main_db_2pc.sql @@ -0,0 +1,75 @@ +SELECT citus.mitmproxy('conn.allow()'); + +CREATE SCHEMA failure_non_main_db_2pc; +SET SEARCH_PATH TO 'failure_non_main_db_2pc'; + +CREATE DATABASE other_db1; + +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + +\c other_db1 + +CREATE USER user_1; + +\c regression + +SELECT citus.mitmproxy('conn.allow()'); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + +SELECT recover_prepared_transactions(); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + + +SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()'); + +\c other_db1 + +CREATE USER user_2; + +\c regression + +SELECT citus.mitmproxy('conn.allow()'); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + +SELECT recover_prepared_transactions(); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + +DROP DATABASE other_db1; +-- user_2 should not exist because the query to create it will fail +-- but let's make sure we try to drop it just in case +DROP USER IF EXISTS user_1, user_2; + +SELECT citus_set_coordinator_host('localhost'); + +\c - - - :worker_1_port + +CREATE DATABASE other_db2; + +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + +\c other_db2 + +CREATE USER user_3; + +\c regression + +SELECT citus.mitmproxy('conn.allow()'); + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + +SELECT recover_prepared_transactions(); + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + +DROP DATABASE other_db2; +DROP USER user_3; + +\c - - - :master_port + +SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$); + +DROP SCHEMA failure_non_main_db_2pc; diff --git a/src/test/regress/sql/multi_mx_transaction_recovery.sql b/src/test/regress/sql/multi_mx_transaction_recovery.sql index 2a6b4991b..e46917f35 100644 --- a/src/test/regress/sql/multi_mx_transaction_recovery.sql +++ b/src/test/regress/sql/multi_mx_transaction_recovery.sql @@ -47,7 +47,7 @@ INSERT INTO pg_dist_transaction VALUES (122, 'citus_122_should_do_nothing'); SELECT recover_prepared_transactions(); -- delete the citus_122_should_do_nothing transaction -DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *; +DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid; ROLLBACK PREPARED 'citus_122_should_do_nothing'; SELECT count(*) FROM pg_dist_transaction; diff --git a/src/test/regress/sql/other_databases.sql b/src/test/regress/sql/other_databases.sql new file mode 100644 index 000000000..629f74f45 --- /dev/null +++ b/src/test/regress/sql/other_databases.sql @@ -0,0 +1,98 @@ +CREATE SCHEMA other_databases; +SET search_path TO other_databases; + +SET citus.next_shard_id TO 10231023; + +CREATE DATABASE other_db1; + +\c other_db1 +SHOW citus.main_db; + +-- check that empty citus.superuser gives error +SET citus.superuser TO ''; +CREATE USER empty_superuser; +SET citus.superuser TO 'postgres'; + +CREATE USER other_db_user1; +CREATE USER other_db_user2; + +BEGIN; +CREATE USER other_db_user3; +CREATE USER other_db_user4; +COMMIT; + +BEGIN; +CREATE USER other_db_user5; +CREATE USER other_db_user6; +ROLLBACK; + +BEGIN; +CREATE USER other_db_user7; +SELECT 1/0; +COMMIT; + +CREATE USER other_db_user8; + +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :master_port +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8; + +-- Make sure non-superuser roles cannot use internal GUCs +-- but they can still create a role +CREATE USER nonsuperuser CREATEROLE; +GRANT ALL ON SCHEMA citus_internal TO nonsuperuser; +SET ROLE nonsuperuser; +SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres'); + +\c other_db1 +CREATE USER other_db_user9; + +RESET ROLE; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :master_port +REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser; +DROP USER other_db_user9, nonsuperuser; + +-- test from a worker +\c - - - :worker_1_port + +CREATE DATABASE other_db2; + +\c other_db2 + +CREATE USER worker_user1; + +BEGIN; +CREATE USER worker_user2; +COMMIT; + +BEGIN; +CREATE USER worker_user3; +ROLLBACK; + +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + +\c - - - :master_port +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS worker_user1, worker_user2, worker_user3; + +\c - - - :worker_1_port +DROP DATABASE other_db2; +\c - - - :master_port + +DROP SCHEMA other_databases; +DROP DATABASE other_db1;