From b877d606c7ab469e7584368fa73ba428905fa164 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halil=20Ozan=20Akg=C3=BCl?= Date: Fri, 22 Dec 2023 19:19:41 +0300 Subject: [PATCH 1/3] Adds 2PC distributed commands from other databases (#7203) DESCRIPTION: Adds support for 2PC from non-Citus main databases This PR only adds support for `CREATE USER` queries, other queries need to be added. But it should be simple because this PR creates the underlying structure. Citus main database is the database where the Citus extension is created. A non-main database is all the other databases that are in the same node with a Citus main database. When a `CREATE USER` query is run on a non-main database we: 1. Run `start_management_transaction` on the main database. This function saves the outer transaction's xid (the non-main database query's transaction id) and marks the current query as main db command. 2. Run `execute_command_on_remote_nodes_as_user("CREATE USER ", )` on the main database. This function creates the users in the rest of the cluster by running the query on the other nodes. The user on the current node is created by the query on the outer, non-main db, query to make sure consequent commands in the same transaction can see this user. 3. Run `mark_object_distributed` on the main database. This function adds the user to `pg_dist_object` in all of the nodes, including the current one. This PR also implements transaction recovery for the queries from non-main databases. --- src/backend/distributed/commands/function.c | 1 + .../distributed/commands/utility_hook.c | 68 ++++++++ .../connection/connection_configuration.c | 22 ++- src/backend/distributed/metadata/distobject.c | 52 +++++- .../distributed/metadata/metadata_cache.c | 9 + .../distributed/metadata/metadata_sync.c | 23 ++- src/backend/distributed/shared_library_init.c | 14 ++ .../distributed/sql/citus--12.1-1--12.2-1.sql | 7 + .../sql/downgrades/citus--12.2-1--12.1-1.sql | 17 ++ .../commit_management_command_2pc/12.2-1.sql | 7 + .../commit_management_command_2pc/latest.sql | 7 + .../12.2-1.sql | 7 + .../latest.sql | 7 + .../udfs/mark_object_distributed/12.2-1.sql | 7 + .../udfs/mark_object_distributed/latest.sql | 7 + .../start_management_transaction/12.2-1.sql | 7 + .../start_management_transaction/latest.sql | 7 + .../transaction/remote_transaction.c | 158 +++++++++++++++++- .../transaction/transaction_management.c | 36 +++- .../transaction/transaction_recovery.c | 53 +++++- .../transaction/worker_transaction.c | 3 +- src/include/distributed/metadata/distobject.h | 1 + src/include/distributed/metadata_sync.h | 1 + src/include/distributed/pg_dist_transaction.h | 3 +- src/include/distributed/remote_transaction.h | 9 + .../distributed/transaction_recovery.h | 3 +- .../citus_tests/test/test_other_databases.py | 154 +++++++++++++++++ .../expected/failure_non_main_db_2pc.out | 154 +++++++++++++++++ src/test/regress/expected/multi_extension.out | 8 +- .../multi_mx_transaction_recovery.out | 2 +- src/test/regress/expected/other_databases.out | 130 ++++++++++++++ .../expected/upgrade_list_citus_objects.out | 6 +- src/test/regress/failure_schedule | 1 + src/test/regress/multi_schedule | 1 + src/test/regress/pg_regress_multi.pl | 2 + .../regress/sql/failure_non_main_db_2pc.sql | 75 +++++++++ .../sql/multi_mx_transaction_recovery.sql | 2 +- src/test/regress/sql/other_databases.sql | 98 +++++++++++ 38 files changed, 1146 insertions(+), 23 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/commit_management_command_2pc/12.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql create mode 100644 src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/latest.sql create mode 100644 src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql create mode 100644 src/backend/distributed/sql/udfs/start_management_transaction/12.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/start_management_transaction/latest.sql create mode 100644 src/test/regress/citus_tests/test/test_other_databases.py create mode 100644 src/test/regress/expected/failure_non_main_db_2pc.out create mode 100644 src/test/regress/expected/other_databases.out create mode 100644 src/test/regress/sql/failure_non_main_db_2pc.sql create mode 100644 src/test/regress/sql/other_databases.sql diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index b76a6d5bf..6d2dd0ba9 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -885,6 +885,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, char *workerPgDistObjectUpdateCommand = MarkObjectsDistributedCreateCommand(objectAddressList, + NIL, distArgumentIndexList, colocationIdList, forceDelegationList); diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index b62dda9ad..9e6b66e3e 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -34,6 +34,7 @@ #include "access/htup_details.h" #include "catalog/catalog.h" #include "catalog/dependency.h" +#include "catalog/pg_authid.h" #include "catalog/pg_database.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -44,6 +45,7 @@ #include "nodes/makefuncs.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" +#include "postmaster/postmaster.h" #include "tcop/utility.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -77,6 +79,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/reference_table_utils.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/string_utils.h" #include "distributed/transaction_management.h" @@ -84,6 +87,13 @@ #include "distributed/worker_shard_visibility.h" #include "distributed/worker_transaction.h" +#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \ + "SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)" +#define START_MANAGEMENT_TRANSACTION \ + "SELECT citus_internal.start_management_transaction('%lu')" +#define MARK_OBJECT_DISTRIBUTED \ + "SELECT citus_internal.mark_object_distributed(%d, %s, %d)" + bool EnableDDLPropagation = true; /* ddl propagation is enabled */ int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE; @@ -112,6 +122,8 @@ static void PostStandardProcessUtility(Node *parsetree); static void DecrementUtilityHookCountersIfNecessary(Node *parsetree); static bool IsDropSchemaOrDB(Node *parsetree); static bool ShouldCheckUndistributeCitusLocalTables(void); +static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString); +static void RunPostprocessMainDBCommand(Node *parsetree); /* * ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of @@ -243,6 +255,11 @@ citus_ProcessUtility(PlannedStmt *pstmt, if (!CitusHasBeenLoaded()) { + if (!IsMainDB) + { + RunPreprocessMainDBCommand(parsetree, queryString); + } + /* * Ensure that utility commands do not behave any differently until CREATE * EXTENSION is invoked. @@ -250,6 +267,11 @@ citus_ProcessUtility(PlannedStmt *pstmt, PrevProcessUtility(pstmt, queryString, false, context, params, queryEnv, dest, completionTag); + if (!IsMainDB) + { + RunPostprocessMainDBCommand(parsetree); + } + return; } else if (IsA(parsetree, CallStmt)) @@ -1572,3 +1594,49 @@ DropSchemaOrDBInProgress(void) { return activeDropSchemaOrDBs > 0; } + + +/* + * RunPreprocessMainDBCommand runs the necessary commands for a query, in main + * database before query is run on the local node with PrevProcessUtility + */ +static void +RunPreprocessMainDBCommand(Node *parsetree, const char *queryString) +{ + if (IsA(parsetree, CreateRoleStmt)) + { + StringInfo mainDBQuery = makeStringInfo(); + appendStringInfo(mainDBQuery, + START_MANAGEMENT_TRANSACTION, + GetCurrentFullTransactionId().value); + RunCitusMainDBQuery(mainDBQuery->data); + mainDBQuery = makeStringInfo(); + appendStringInfo(mainDBQuery, + EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER, + quote_literal_cstr(queryString), + quote_literal_cstr(CurrentUserName())); + RunCitusMainDBQuery(mainDBQuery->data); + } +} + + +/* + * RunPostprocessMainDBCommand runs the necessary commands for a query, in main + * database after query is run on the local node with PrevProcessUtility + */ +static void +RunPostprocessMainDBCommand(Node *parsetree) +{ + if (IsA(parsetree, CreateRoleStmt)) + { + StringInfo mainDBQuery = makeStringInfo(); + CreateRoleStmt *createRoleStmt = castNode(CreateRoleStmt, parsetree); + Oid roleOid = get_role_oid(createRoleStmt->role, false); + appendStringInfo(mainDBQuery, + MARK_OBJECT_DISTRIBUTED, + AuthIdRelationId, + quote_literal_cstr(createRoleStmt->role), + roleOid); + RunCitusMainDBQuery(mainDBQuery->data); + } +} diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index c6a34a9d7..57069f698 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -425,11 +425,13 @@ GetConnParam(const char *keyword) /* * GetEffectiveConnKey checks whether there is any pooler configuration for the - * provided key (host/port combination). The one case where this logic is not - * applied is for loopback connections originating within the task tracker. If - * a corresponding row is found in the poolinfo table, a modified (effective) - * key is returned with the node, port, and dbname overridden, as applicable, - * otherwise, the original key is returned unmodified. + * provided key (host/port combination). If a corresponding row is found in the + * poolinfo table, a modified (effective) key is returned with the node, port, + * and dbname overridden, as applicable, otherwise, the original key is returned + * unmodified. + * + * In the case of Citus non-main databases we just return the key, since we + * would not have access to tables with worker information. */ ConnectionHashKey * GetEffectiveConnKey(ConnectionHashKey *key) @@ -444,7 +446,17 @@ GetEffectiveConnKey(ConnectionHashKey *key) return key; } + if (!CitusHasBeenLoaded()) + { + /* + * This happens when we connect to main database over localhost + * from some non Citus database. + */ + return key; + } + WorkerNode *worker = FindWorkerNode(key->hostname, key->port); + if (worker == NULL) { /* this can be hit when the key references an unknown node */ diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index 25d976a55..edccd86b9 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -49,18 +49,42 @@ #include "distributed/metadata/pg_dist_object.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/remote_commands.h" #include "distributed/version_compat.h" #include "distributed/worker_transaction.h" -static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); +static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, + char *objectName); static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, Datum *paramValues); static bool IsObjectDistributed(const ObjectAddress *address); +PG_FUNCTION_INFO_V1(mark_object_distributed); PG_FUNCTION_INFO_V1(citus_unmark_object_distributed); PG_FUNCTION_INFO_V1(master_unmark_object_distributed); +/* + * mark_object_distributed adds an object to pg_dist_object + * in all of the nodes. + */ +Datum +mark_object_distributed(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + Oid classId = PG_GETARG_OID(0); + text *objectNameText = PG_GETARG_TEXT_P(1); + char *objectName = text_to_cstring(objectNameText); + Oid objectId = PG_GETARG_OID(2); + ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*objectAddress, classId, objectId); + MarkObjectDistributedWithName(objectAddress, objectName); + PG_RETURN_VOID(); +} + + /* * citus_unmark_object_distributed(classid oid, objid oid, objsubid int) * @@ -160,12 +184,28 @@ ObjectExists(const ObjectAddress *address) void MarkObjectDistributed(const ObjectAddress *distAddress) { + MarkObjectDistributedWithName(distAddress, ""); +} + + +/* + * MarkObjectDistributedWithName marks an object as a distributed object. + * Same as MarkObjectDistributed but this function also allows passing an objectName + * that is used in case the object does not exists for the current transaction. + */ +void +MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName) +{ + if (!CitusHasBeenLoaded()) + { + elog(ERROR, "Cannot mark object distributed because Citus has not been loaded."); + } MarkObjectDistributedLocally(distAddress); if (EnableMetadataSync) { char *workerPgDistObjectUpdateCommand = - CreatePgDistObjectEntryCommand(distAddress); + CreatePgDistObjectEntryCommand(distAddress, objectName); SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand); } } @@ -188,7 +228,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress) if (EnableMetadataSync) { char *workerPgDistObjectUpdateCommand = - CreatePgDistObjectEntryCommand(distAddress); + CreatePgDistObjectEntryCommand(distAddress, ""); SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand); } } @@ -279,17 +319,21 @@ ShouldMarkRelationDistributed(Oid relationId) * for the given object address. */ static char * -CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress) +CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, char *objectName) { /* create a list by adding the address of value to not to have warning */ List *objectAddressList = list_make1((ObjectAddress *) objectAddress); + + /* names also require a list so we create a nested list here */ + List *objectNameList = list_make1(list_make1((char *) objectName)); List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX); List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID); List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN); char *workerPgDistObjectUpdateCommand = MarkObjectsDistributedCreateCommand(objectAddressList, + objectNameList, distArgumetIndexList, colocationIdList, forceDelegationList); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 1b2fa229f..b8983ba21 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -79,6 +79,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_placement.h" #include "distributed/pg_dist_shard.h" +#include "distributed/remote_commands.h" #include "distributed/shardinterval_utils.h" #include "distributed/shared_library_init.h" #include "distributed/utils/array_type.h" @@ -5722,6 +5723,14 @@ GetPoolinfoViaCatalog(int32 nodeId) char * GetAuthinfoViaCatalog(const char *roleName, int64 nodeId) { + /* + * Citus will not be loaded when we run a global DDL command from a + * Citus non-main database. + */ + if (!CitusHasBeenLoaded()) + { + return ""; + } char *authinfo = ""; Datum nodeIdDatumArray[2] = { Int32GetDatum(nodeId), diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f0be1995b..842a45519 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -83,6 +83,7 @@ #include "distributed/pg_dist_shard.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" #include "distributed/resource_lock.h" #include "distributed/tenant_schema_metadata.h" #include "distributed/utils/array_type.h" @@ -900,6 +901,7 @@ NodeListIdempotentInsertCommand(List *workerNodeList) */ char * MarkObjectsDistributedCreateCommand(List *addresses, + List *namesArg, List *distributionArgumentIndexes, List *colocationIds, List *forceDelegations) @@ -924,9 +926,25 @@ MarkObjectsDistributedCreateCommand(List *addresses, int forceDelegation = list_nth_int(forceDelegations, currentObjectCounter); List *names = NIL; List *args = NIL; + char *objectType = NULL; - char *objectType = getObjectTypeDescription(address, false); - getObjectIdentityParts(address, &names, &args, false); + if (IsMainDBCommand) + { + /* + * When we try to distribute an object that's being created in a non Citus + * main database, we cannot find the name, since the object is not visible + * in Citus main database. + * Because of that we need to pass the name to this function. + */ + names = list_nth(namesArg, currentObjectCounter); + bool missingOk = false; + objectType = getObjectTypeDescription(address, missingOk); + } + else + { + objectType = getObjectTypeDescription(address, false); + getObjectIdentityParts(address, &names, &args, IsMainDBCommand); + } if (!isFirstObject) { @@ -5148,6 +5166,7 @@ SendDistObjectCommands(MetadataSyncContext *context) char *workerMetadataUpdateCommand = MarkObjectsDistributedCreateCommand(list_make1(address), + NIL, list_make1_int(distributionArgumentIndex), list_make1_int(colocationId), list_make1_int(forceDelegation)); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index ffb235596..ad5a14a25 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -94,6 +94,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" #include "distributed/repartition_executor.h" #include "distributed/replication_origin_session_utils.h" #include "distributed/resource_lock.h" @@ -2570,6 +2571,17 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NoticeIfSubqueryPushdownEnabled, NULL, NULL); + DefineCustomStringVariable( + "citus.superuser", + gettext_noop("Name of a superuser role to be used in Citus main database " + "connections"), + NULL, + &SuperuserRole, + "", + PGC_SUSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomEnumVariable( "citus.task_assignment_policy", gettext_noop("Sets the policy to use when assigning tasks to worker nodes."), @@ -3149,6 +3161,8 @@ CitusAuthHook(Port *port, int status) */ InitializeBackendData(port->application_name); + IsMainDB = (strncmp(MainDb, "", NAMEDATALEN) == 0 || + strncmp(MainDb, port->database_name, NAMEDATALEN) == 0); /* let other authentication hooks to kick in first */ if (original_client_auth_hook) diff --git a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql index 63c4a527f..2ce2d7a21 100644 --- a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql +++ b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql @@ -3,3 +3,10 @@ #include "udfs/citus_internal_database_command/12.2-1.sql" #include "udfs/citus_add_rebalance_strategy/12.2-1.sql" + +#include "udfs/start_management_transaction/12.2-1.sql" +#include "udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql" +#include "udfs/mark_object_distributed/12.2-1.sql" +#include "udfs/commit_management_command_2pc/12.2-1.sql" + +ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8; diff --git a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql index d18f7257b..0a6f68b06 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql @@ -3,3 +3,20 @@ DROP FUNCTION pg_catalog.citus_internal_database_command(text); #include "../udfs/citus_add_rebalance_strategy/10.1-1.sql" + +DROP FUNCTION citus_internal.start_management_transaction( + outer_xid xid8 +); + +DROP FUNCTION citus_internal.execute_command_on_remote_nodes_as_user( + query text, + username text +); + +DROP FUNCTION citus_internal.mark_object_distributed( + classId Oid, objectName text, objectId Oid +); + +DROP FUNCTION citus_internal.commit_management_command_2pc(); + +ALTER TABLE pg_catalog.pg_dist_transaction DROP COLUMN outer_xid; diff --git a/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.2-1.sql b/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.2-1.sql new file mode 100644 index 000000000..8c24e6dd4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc() + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$; + +COMMENT ON FUNCTION citus_internal.commit_management_command_2pc() + IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit'; diff --git a/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql b/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql new file mode 100644 index 000000000..8c24e6dd4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc() + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$; + +COMMENT ON FUNCTION citus_internal.commit_management_command_2pc() + IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit'; diff --git a/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql new file mode 100644 index 000000000..fc1076e9c --- /dev/null +++ b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$; + +COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + IS 'executes a query on the nodes other than the current one'; diff --git a/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/latest.sql b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/latest.sql new file mode 100644 index 000000000..fc1076e9c --- /dev/null +++ b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$; + +COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + IS 'executes a query on the nodes other than the current one'; diff --git a/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql b/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql new file mode 100644 index 000000000..ee2c5e7e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$mark_object_distributed$$; + +COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql b/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql new file mode 100644 index 000000000..ee2c5e7e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$mark_object_distributed$$; + +COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/sql/udfs/start_management_transaction/12.2-1.sql b/src/backend/distributed/sql/udfs/start_management_transaction/12.2-1.sql new file mode 100644 index 000000000..ec1f416d0 --- /dev/null +++ b/src/backend/distributed/sql/udfs/start_management_transaction/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$start_management_transaction$$; + +COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + IS 'internal Citus function that starts a management transaction in the main database'; diff --git a/src/backend/distributed/sql/udfs/start_management_transaction/latest.sql b/src/backend/distributed/sql/udfs/start_management_transaction/latest.sql new file mode 100644 index 000000000..ec1f416d0 --- /dev/null +++ b/src/backend/distributed/sql/udfs/start_management_transaction/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$start_management_transaction$$; + +COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + IS 'internal Citus function that starts a management transaction in the main database'; diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 3dc89c995..71b6a78dd 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -19,14 +19,19 @@ #include "miscadmin.h" #include "access/xact.h" +#include "postmaster/postmaster.h" #include "utils/builtins.h" #include "utils/hsearch.h" +#include "utils/xid8.h" #include "distributed/backend_data.h" #include "distributed/citus_safe_lib.h" +#include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" +#include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" @@ -56,6 +61,9 @@ static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection static void Assign2PCIdentifier(MultiConnection *connection); +PG_FUNCTION_INFO_V1(start_management_transaction); +PG_FUNCTION_INFO_V1(execute_command_on_remote_nodes_as_user); +PG_FUNCTION_INFO_V1(commit_management_command_2pc); static char *IsolationLevelName[] = { "READ UNCOMMITTED", @@ -64,6 +72,154 @@ static char *IsolationLevelName[] = { "SERIALIZABLE" }; +/* + * These variables are necessary for running queries from a database that is not + * the Citus main database. Some of these queries need to be propagated to the + * workers and Citus main database will be used for these queries, such as + * CREATE ROLE. For that we create a connection to the Citus main database and + * run queries from there. + */ + +/* The MultiConnection used for connecting Citus main database. */ +MultiConnection *MainDBConnection = NULL; + +/* + * IsMainDBCommand is true if this is a query in the Citus main database that is started + * by a query from a different database. + */ +bool IsMainDBCommand = false; + +/* + * The transaction id of the query from the other database that started the + * main database query. + */ +FullTransactionId OuterXid; + +/* + * Shows if this is the Citus main database or not. We needed a variable instead of + * checking if this database's name is the same as MainDb because we sometimes need + * this value outside a transaction where we cannot reach the current database name. + */ +bool IsMainDB = true; + +/* + * Name of a superuser role to be used during main database connections. + */ +char *SuperuserRole = NULL; + + +/* + * start_management_transaction starts a management transaction + * in the main database by recording the outer transaction's transaction id and setting + * IsMainDBCommand to true. + */ +Datum +start_management_transaction(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + OuterXid = PG_GETARG_FULLTRANSACTIONID(0); + IsMainDBCommand = true; + + Use2PCForCoordinatedTransaction(); + + PG_RETURN_VOID(); +} + + +/* + * execute_command_on_remote_nodes_as_user executes the query on the nodes + * other than the current node, using the user passed. + */ +Datum +execute_command_on_remote_nodes_as_user(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + text *queryText = PG_GETARG_TEXT_P(0); + char *query = text_to_cstring(queryText); + + text *usernameText = PG_GETARG_TEXT_P(1); + char *username = text_to_cstring(usernameText); + + StringInfo queryToSend = makeStringInfo(); + + appendStringInfo(queryToSend, "%s;%s;%s", DISABLE_METADATA_SYNC, query, + ENABLE_METADATA_SYNC); + + SendCommandToWorkersAsUser(REMOTE_NODES, username, queryToSend->data); + PG_RETURN_VOID(); +} + + +/* + * commit_management_command_2pc is a wrapper UDF for + * CoordinatedRemoteTransactionsCommit + */ +Datum +commit_management_command_2pc(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + RecoverTwoPhaseCommits(); + + PG_RETURN_VOID(); +} + + +/* + * RunCitusMainDBQuery creates a connection to Citus main database if necessary + * and runs the query over the connection in the main database. + */ +void +RunCitusMainDBQuery(char *query) +{ + if (MainDBConnection == NULL) + { + if (strlen(SuperuserRole) == 0) + { + ereport(ERROR, (errmsg("No superuser role is given for Citus main " + "database connection"), + errhint("Set citus.superuser to a superuser role name"))); + } + int flags = 0; + MainDBConnection = GetNodeUserDatabaseConnection(flags, LocalHostName, + PostPortNumber, + SuperuserRole, + MainDb); + RemoteTransactionBegin(MainDBConnection); + } + + SendRemoteCommand(MainDBConnection, query); + + PGresult *result = GetRemoteCommandResult(MainDBConnection, true); + + if (!IsResponseOK(result)) + { + ReportResultError(MainDBConnection, result, ERROR); + } + + ForgetResults(MainDBConnection); +} + + +/* + * CleanCitusMainDBConnection closes and removes the connection to Citus main database. + */ +void +CleanCitusMainDBConnection(void) +{ + if (MainDBConnection == NULL) + { + return; + } + CloseConnection(MainDBConnection); + MainDBConnection = NULL; +} + /* * StartRemoteTransactionBegin initiates beginning the remote transaction in @@ -616,7 +772,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection) WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port); if (workerNode != NULL) { - LogTransactionRecord(workerNode->groupId, transaction->preparedName); + LogTransactionRecord(workerNode->groupId, transaction->preparedName, OuterXid); } /* diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index d133d7be6..8b3333639 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,6 +21,7 @@ #include "catalog/dependency.h" #include "common/hashfn.h" #include "nodes/print.h" +#include "postmaster/postmaster.h" #include "storage/fd.h" #include "utils/datum.h" #include "utils/guc.h" @@ -46,6 +47,7 @@ #include "distributed/multi_logical_replication.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" +#include "distributed/remote_commands.h" #include "distributed/repartition_join_execution.h" #include "distributed/replication_origin_session_utils.h" #include "distributed/shard_cleaner.h" @@ -55,6 +57,9 @@ #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" +#define COMMIT_MANAGEMENT_COMMAND_2PC \ + "SELECT citus_internal.commit_management_command_2pc()" + CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; @@ -317,12 +322,23 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) MemoryContext previousContext = MemoryContextSwitchTo(CitusXactCallbackContext); - if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) + if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED && + !IsMainDBCommand) { /* handles both already prepared and open transactions */ CoordinatedRemoteTransactionsCommit(); } + /* + * If this is a non-Citus main database we should try to commit the prepared + * transactions created by the Citus main database on the worker nodes. + */ + if (!IsMainDB && MainDBConnection != NULL) + { + RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC); + CleanCitusMainDBConnection(); + } + /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { @@ -378,6 +394,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) RemoveIntermediateResultsDirectories(); + CleanCitusMainDBConnection(); + /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) { @@ -509,6 +527,17 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) break; } + + /* + * If this is a non-Citus main database we should commit the Citus + * main database query. So if some error happens on the distributed main + * database query we wouldn't have committed the current query. + */ + if (!IsMainDB && MainDBConnection != NULL) + { + RunCitusMainDBQuery("COMMIT"); + } + /* * TODO: It'd probably be a good idea to force constraints and * such to 'immediate' here. Deferred triggers might try to send @@ -537,7 +566,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * us to mark failed placements as invalid. Better don't use * this for anything important (i.e. DDL/metadata). */ - CoordinatedRemoteTransactionsCommit(); + if (IsMainDB) + { + CoordinatedRemoteTransactionsCommit(); + } CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; } diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 0ec5ba0a3..653b962db 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -29,10 +29,12 @@ #include "lib/stringinfo.h" #include "storage/lmgr.h" #include "storage/lock.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/xid8.h" #include "pg_version_constants.h" @@ -82,7 +84,7 @@ recover_prepared_transactions(PG_FUNCTION_ARGS) * prepared transaction should be committed. */ void -LogTransactionRecord(int32 groupId, char *transactionName) +LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId outerXid) { Datum values[Natts_pg_dist_transaction]; bool isNulls[Natts_pg_dist_transaction]; @@ -93,6 +95,7 @@ LogTransactionRecord(int32 groupId, char *transactionName) values[Anum_pg_dist_transaction_groupid - 1] = Int32GetDatum(groupId); values[Anum_pg_dist_transaction_gid - 1] = CStringGetTextDatum(transactionName); + values[Anum_pg_dist_transaction_outerxid - 1] = FullTransactionIdGetDatum(outerXid); /* open transaction relation and insert new tuple */ Relation pgDistTransaction = table_open(DistTransactionRelationId(), @@ -258,6 +261,54 @@ RecoverWorkerTransactions(WorkerNode *workerNode) continue; } + /* Check if the transaction is created by an outer transaction from a non-main database */ + bool outerXidIsNull = false; + Datum outerXidDatum = heap_getattr(heapTuple, + Anum_pg_dist_transaction_outerxid, + tupleDescriptor, &outerXidIsNull); + + TransactionId outerXid = 0; + if (!outerXidIsNull) + { + FullTransactionId outerFullXid = DatumGetFullTransactionId(outerXidDatum); + outerXid = XidFromFullTransactionId(outerFullXid); + } + + if (outerXid != 0) + { + bool outerXactIsInProgress = TransactionIdIsInProgress(outerXid); + bool outerXactDidCommit = TransactionIdDidCommit(outerXid); + if (outerXactIsInProgress && !outerXactDidCommit) + { + /* + * The transaction is initiated from an outer transaction and the outer + * transaction is not yet committed, so we should not commit either. + * We remove this transaction from the pendingTransactionSet so it'll + * not be aborted by the loop below. + */ + hash_search(pendingTransactionSet, transactionName, HASH_REMOVE, + &foundPreparedTransactionBeforeCommit); + continue; + } + else if (!outerXactIsInProgress && !outerXactDidCommit) + { + /* + * Since outer transaction isn't in progress and did not commit we need to + * abort the prepared transaction too. We do this by simply doing the same + * thing we would do for transactions that are initiated from the main + * database. + */ + continue; + } + else + { + /* + * Outer transaction did commit, so we can try to commit the prepared + * transaction too. + */ + } + } + /* * Remove the transaction from the pending list such that only transactions * that need to be aborted remain at the end. diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 2af1e9a6c..9c8563de0 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -234,7 +234,8 @@ List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { List *workerNodeList = NIL; - if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES) + if (targetWorkerSet == ALL_SHARD_NODES || + targetWorkerSet == METADATA_NODES) { workerNodeList = ActivePrimaryNodeList(lockMode); } diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index cf24a8c81..bbbbdf9da 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress); extern bool IsAnyObjectDistributed(const List *addresses); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); +extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index e20c44535..9f4c0a24b 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -89,6 +89,7 @@ extern List * NodeMetadataCreateCommands(void); extern List * CitusTableMetadataCreateCommandList(Oid relationId); extern List * NodeMetadataDropCommands(void); extern char * MarkObjectsDistributedCreateCommand(List *addresses, + List *names, List *distributionArgumentIndexes, List *colocationIds, List *forceDelegations); diff --git a/src/include/distributed/pg_dist_transaction.h b/src/include/distributed/pg_dist_transaction.h index 815633b70..95658f782 100644 --- a/src/include/distributed/pg_dist_transaction.h +++ b/src/include/distributed/pg_dist_transaction.h @@ -35,9 +35,10 @@ typedef FormData_pg_dist_transaction *Form_pg_dist_transaction; * compiler constants for pg_dist_transaction * ---------------- */ -#define Natts_pg_dist_transaction 2 +#define Natts_pg_dist_transaction 3 #define Anum_pg_dist_transaction_groupid 1 #define Anum_pg_dist_transaction_gid 2 +#define Anum_pg_dist_transaction_outerxid 3 #endif /* PG_DIST_TRANSACTION_H */ diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index 1c422da20..2b61c25bd 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -144,4 +144,13 @@ extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId); extern void CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId); extern void CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId); +extern void RunCitusMainDBQuery(char *query); +extern void CleanCitusMainDBConnection(void); + +extern bool IsMainDBCommand; +extern bool IsMainDB; +extern char *SuperuserRole; +extern char *MainDb; +extern struct MultiConnection *MainDBConnection; + #endif /* REMOTE_TRANSACTION_H */ diff --git a/src/include/distributed/transaction_recovery.h b/src/include/distributed/transaction_recovery.h index 811dbb949..a4073875a 100644 --- a/src/include/distributed/transaction_recovery.h +++ b/src/include/distributed/transaction_recovery.h @@ -17,7 +17,8 @@ extern int Recover2PCInterval; /* Functions declarations for worker transactions */ -extern void LogTransactionRecord(int32 groupId, char *transactionName); +extern void LogTransactionRecord(int32 groupId, char *transactionName, + FullTransactionId outerXid); extern int RecoverTwoPhaseCommits(void); extern void DeleteWorkerTransactions(WorkerNode *workerNode); diff --git a/src/test/regress/citus_tests/test/test_other_databases.py b/src/test/regress/citus_tests/test/test_other_databases.py new file mode 100644 index 000000000..cf824f926 --- /dev/null +++ b/src/test/regress/citus_tests/test/test_other_databases.py @@ -0,0 +1,154 @@ +def test_main_commited_outer_not_yet(cluster): + c = cluster.coordinator + w0 = cluster.workers[0] + + # create a non-main database + c.sql("CREATE DATABASE db1") + + # we will use cur1 to simulate non-main database user and + # cur2 to manually do the steps we would do in the main database + with c.cur(dbname="db1") as cur1, c.cur() as cur2: + # let's start a transaction and find its transaction id + cur1.execute("BEGIN") + cur1.execute("SELECT txid_current()") + txid = cur1.fetchall() + + # using the transaction id of the cur1 simulate the main database commands manually + cur2.execute("BEGIN") + cur2.execute( + "SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),) + ) + cur2.execute( + "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')" + ) + cur2.execute( + "SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)" + ) + cur2.execute("COMMIT") + + # run the transaction recovery + c.sql("SELECT recover_prepared_transactions()") + + # user should not be created on the worker because outer transaction is not committed yet + role_before_commit = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u1'" + ) + + assert ( + int(role_before_commit) == 0 + ), "role is on pg_dist_object despite not committing" + + # user should not be in pg_dist_object on the worker because outer transaction is not committed yet + pdo_before_commit = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" + ) + + assert int(pdo_before_commit) == 0, "role is created despite not committing" + + # commit in cur1 so the transaction recovery thinks this is a successful transaction + cur1.execute("COMMIT") + + # run the transaction recovery again after committing + c.sql("SELECT recover_prepared_transactions()") + + # check that the user is created by the transaction recovery on the worker + role_after_commit = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u1'" + ) + + assert ( + int(role_after_commit) == 1 + ), "role is not created during recovery despite committing" + + # check that the user is on pg_dist_object on the worker after transaction recovery + pdo_after_commit = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" + ) + + assert ( + int(pdo_after_commit) == 1 + ), "role is not on pg_dist_object after recovery despite committing" + + c.sql("DROP DATABASE db1") + c.sql( + "SELECT citus_internal.execute_command_on_remote_nodes_as_user('DROP USER u1', 'postgres')" + ) + c.sql( + """ + SELECT run_command_on_workers($$ + DELETE FROM pg_dist_object + WHERE objid::regrole::text = 'u1' + $$) + """ + ) + + +def test_main_commited_outer_aborted(cluster): + c = cluster.coordinator + w0 = cluster.workers[0] + + # create a non-main database + c.sql("CREATE DATABASE db2") + + # we will use cur1 to simulate non-main database user and + # cur2 to manually do the steps we would do in the main database + with c.cur(dbname="db2") as cur1, c.cur() as cur2: + # let's start a transaction and find its transaction id + cur1.execute("BEGIN") + cur1.execute("SELECT txid_current()") + txid = cur1.fetchall() + + # using the transaction id of the cur1 simulate the main database commands manually + cur2.execute("BEGIN") + cur2.execute( + "SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),) + ) + cur2.execute( + "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u2;', 'postgres')" + ) + cur2.execute( + "SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321)" + ) + cur2.execute("COMMIT") + + # abort cur1 so the transaction recovery thinks this is an aborted transaction + cur1.execute("ABORT") + + # check that the user is not yet created on the worker + role_before_recovery = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u2'" + ) + + assert int(role_before_recovery) == 0, "role is already created before recovery" + + # check that the user is not on pg_dist_object on the worker + pdo_before_recovery = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'" + ) + + assert ( + int(pdo_before_recovery) == 0 + ), "role is already on pg_dist_object before recovery" + + # run the transaction recovery + c.sql("SELECT recover_prepared_transactions()") + + # check that the user is not created by the transaction recovery on the worker + role_after_recovery = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u2'" + ) + + assert ( + int(role_after_recovery) == 0 + ), "role is created during recovery despite aborting" + + # check that the user is not on pg_dist_object on the worker after transaction recovery + pdo_after_recovery = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'" + ) + + assert ( + int(pdo_after_recovery) == 0 + ), "role is on pg_dist_object after recovery despite aborting" + + c.sql("DROP DATABASE db2") diff --git a/src/test/regress/expected/failure_non_main_db_2pc.out b/src/test/regress/expected/failure_non_main_db_2pc.out new file mode 100644 index 000000000..1e8558136 --- /dev/null +++ b/src/test/regress/expected/failure_non_main_db_2pc.out @@ -0,0 +1,154 @@ +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CREATE SCHEMA failure_non_main_db_2pc; +SET SEARCH_PATH TO 'failure_non_main_db_2pc'; +CREATE DATABASE other_db1; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +\c other_db1 +CREATE USER user_1; +\c regression +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | user_1 + 1 | user_1 + 2 | +(3 rows) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | user_1 + 1 | user_1 + 2 | user_1 +(3 rows) + +SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +\c other_db1 +CREATE USER user_2; +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx +while executing command on localhost:xxxxx +\c regression +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | + 1 | + 2 | +(3 rows) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | + 1 | + 2 | +(3 rows) + +DROP DATABASE other_db1; +-- user_2 should not exist because the query to create it will fail +-- but let's make sure we try to drop it just in case +DROP USER IF EXISTS user_1, user_2; +NOTICE: role "user_2" does not exist, skipping +SELECT citus_set_coordinator_host('localhost'); + citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +CREATE DATABASE other_db2; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +\c other_db2 +CREATE USER user_3; +\c regression +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + result +--------------------------------------------------------------------- + + user_3 + user_3 +(3 rows) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + result +--------------------------------------------------------------------- + user_3 + user_3 + user_3 +(3 rows) + +DROP DATABASE other_db2; +DROP USER user_3; +\c - - - :master_port +SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$); + result +--------------------------------------------------------------------- + DELETE 1 + DELETE 1 + DELETE 1 +(3 rows) + +DROP SCHEMA failure_non_main_db_2pc; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 43f9c3b98..b9f489a1f 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1420,10 +1420,14 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 12.2-1 ALTER EXTENSION citus UPDATE TO '12.2-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- + | function citus_internal.commit_management_command_2pc() void + | function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void + | function citus_internal.mark_object_distributed(oid,text,oid) void + | function citus_internal.start_management_transaction(xid8) void | function citus_internal_database_command(text) void -(1 row) +(5 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index 20cec7578..0a29a22af 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -64,7 +64,7 @@ SELECT recover_prepared_transactions(); (1 row) -- delete the citus_122_should_do_nothing transaction -DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *; +DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid; groupid | gid --------------------------------------------------------------------- 122 | citus_122_should_do_nothing diff --git a/src/test/regress/expected/other_databases.out b/src/test/regress/expected/other_databases.out new file mode 100644 index 000000000..67d7dad3f --- /dev/null +++ b/src/test/regress/expected/other_databases.out @@ -0,0 +1,130 @@ +CREATE SCHEMA other_databases; +SET search_path TO other_databases; +SET citus.next_shard_id TO 10231023; +CREATE DATABASE other_db1; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c other_db1 +SHOW citus.main_db; + citus.main_db +--------------------------------------------------------------------- + regression +(1 row) + +-- check that empty citus.superuser gives error +SET citus.superuser TO ''; +CREATE USER empty_superuser; +ERROR: No superuser role is given for Citus main database connection +HINT: Set citus.superuser to a superuser role name +SET citus.superuser TO 'postgres'; +CREATE USER other_db_user1; +CREATE USER other_db_user2; +BEGIN; +CREATE USER other_db_user3; +CREATE USER other_db_user4; +COMMIT; +BEGIN; +CREATE USER other_db_user5; +CREATE USER other_db_user6; +ROLLBACK; +BEGIN; +CREATE USER other_db_user7; +SELECT 1/0; +ERROR: division by zero +COMMIT; +CREATE USER other_db_user8; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user1 + other_db_user2 + other_db_user3 + other_db_user4 + other_db_user8 +(5 rows) + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user1 + other_db_user2 + other_db_user3 + other_db_user4 + other_db_user8 +(5 rows) + +\c - - - :master_port +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8; +NOTICE: role "other_db_user5" does not exist, skipping +NOTICE: role "other_db_user6" does not exist, skipping +NOTICE: role "other_db_user7" does not exist, skipping +-- Make sure non-superuser roles cannot use internal GUCs +-- but they can still create a role +CREATE USER nonsuperuser CREATEROLE; +GRANT ALL ON SCHEMA citus_internal TO nonsuperuser; +SET ROLE nonsuperuser; +SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres'); +ERROR: operation is not allowed +HINT: Run the command with a superuser. +\c other_db1 +CREATE USER other_db_user9; +RESET ROLE; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user9 +(1 row) + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user9 +(1 row) + +\c - - - :master_port +REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser; +DROP USER other_db_user9, nonsuperuser; +-- test from a worker +\c - - - :worker_1_port +CREATE DATABASE other_db2; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c other_db2 +CREATE USER worker_user1; +BEGIN; +CREATE USER worker_user2; +COMMIT; +BEGIN; +CREATE USER worker_user3; +ROLLBACK; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + worker_user1 + worker_user2 +(2 rows) + +\c - - - :master_port +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + worker_user1 + worker_user2 +(2 rows) + +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS worker_user1, worker_user2, worker_user3; +NOTICE: role "worker_user3" does not exist, skipping +\c - - - :worker_1_port +DROP DATABASE other_db2; +\c - - - :master_port +DROP SCHEMA other_databases; +DROP DATABASE other_db1; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 942e0336f..6d41ac058 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -56,13 +56,17 @@ ORDER BY 1; function citus_get_active_worker_nodes() function citus_get_node_clock() function citus_get_transaction_clock() + function citus_internal.commit_management_command_2pc() + function citus_internal.execute_command_on_remote_nodes_as_user(text,text) function citus_internal.find_groupid_for_node(text,integer) + function citus_internal.mark_object_distributed(oid,text,oid) function citus_internal.pg_dist_node_trigger_func() function citus_internal.pg_dist_rebalance_strategy_trigger_func() function citus_internal.pg_dist_shard_placement_trigger_func() function citus_internal.refresh_isolation_tester_prepared_statement() function citus_internal.replace_isolation_tester_func() function citus_internal.restore_isolation_tester_func() + function citus_internal.start_management_transaction(xid8) function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid) function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char") @@ -344,5 +348,5 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(334 rows) +(338 rows) diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index afc4780bf..e1ad362b5 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -34,6 +34,7 @@ test: failure_multi_row_insert test: failure_mx_metadata_sync test: failure_mx_metadata_sync_multi_trans test: failure_connection_establishment +test: failure_non_main_db_2pc # this test syncs metadata to the workers test: failure_failover_to_local_execution diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 866b07f5f..5c9d8a45c 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -108,6 +108,7 @@ test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes test: background_task_queue_monitor +test: other_databases # Causal clock test test: clock diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 95330c638..c9a85d523 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -490,6 +490,8 @@ push(@pgOptions, "citus.stat_statements_track = 'all'"); push(@pgOptions, "citus.enable_change_data_capture=on"); push(@pgOptions, "citus.stat_tenants_limit = 2"); push(@pgOptions, "citus.stat_tenants_track = 'ALL'"); +push(@pgOptions, "citus.main_db = 'regression'"); +push(@pgOptions, "citus.superuser = 'postgres'"); # Some tests look at shards in pg_class, make sure we can usually see them: push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); diff --git a/src/test/regress/sql/failure_non_main_db_2pc.sql b/src/test/regress/sql/failure_non_main_db_2pc.sql new file mode 100644 index 000000000..74061ae34 --- /dev/null +++ b/src/test/regress/sql/failure_non_main_db_2pc.sql @@ -0,0 +1,75 @@ +SELECT citus.mitmproxy('conn.allow()'); + +CREATE SCHEMA failure_non_main_db_2pc; +SET SEARCH_PATH TO 'failure_non_main_db_2pc'; + +CREATE DATABASE other_db1; + +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + +\c other_db1 + +CREATE USER user_1; + +\c regression + +SELECT citus.mitmproxy('conn.allow()'); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + +SELECT recover_prepared_transactions(); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + + +SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()'); + +\c other_db1 + +CREATE USER user_2; + +\c regression + +SELECT citus.mitmproxy('conn.allow()'); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + +SELECT recover_prepared_transactions(); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + +DROP DATABASE other_db1; +-- user_2 should not exist because the query to create it will fail +-- but let's make sure we try to drop it just in case +DROP USER IF EXISTS user_1, user_2; + +SELECT citus_set_coordinator_host('localhost'); + +\c - - - :worker_1_port + +CREATE DATABASE other_db2; + +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + +\c other_db2 + +CREATE USER user_3; + +\c regression + +SELECT citus.mitmproxy('conn.allow()'); + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + +SELECT recover_prepared_transactions(); + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + +DROP DATABASE other_db2; +DROP USER user_3; + +\c - - - :master_port + +SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$); + +DROP SCHEMA failure_non_main_db_2pc; diff --git a/src/test/regress/sql/multi_mx_transaction_recovery.sql b/src/test/regress/sql/multi_mx_transaction_recovery.sql index 2a6b4991b..e46917f35 100644 --- a/src/test/regress/sql/multi_mx_transaction_recovery.sql +++ b/src/test/regress/sql/multi_mx_transaction_recovery.sql @@ -47,7 +47,7 @@ INSERT INTO pg_dist_transaction VALUES (122, 'citus_122_should_do_nothing'); SELECT recover_prepared_transactions(); -- delete the citus_122_should_do_nothing transaction -DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *; +DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid; ROLLBACK PREPARED 'citus_122_should_do_nothing'; SELECT count(*) FROM pg_dist_transaction; diff --git a/src/test/regress/sql/other_databases.sql b/src/test/regress/sql/other_databases.sql new file mode 100644 index 000000000..629f74f45 --- /dev/null +++ b/src/test/regress/sql/other_databases.sql @@ -0,0 +1,98 @@ +CREATE SCHEMA other_databases; +SET search_path TO other_databases; + +SET citus.next_shard_id TO 10231023; + +CREATE DATABASE other_db1; + +\c other_db1 +SHOW citus.main_db; + +-- check that empty citus.superuser gives error +SET citus.superuser TO ''; +CREATE USER empty_superuser; +SET citus.superuser TO 'postgres'; + +CREATE USER other_db_user1; +CREATE USER other_db_user2; + +BEGIN; +CREATE USER other_db_user3; +CREATE USER other_db_user4; +COMMIT; + +BEGIN; +CREATE USER other_db_user5; +CREATE USER other_db_user6; +ROLLBACK; + +BEGIN; +CREATE USER other_db_user7; +SELECT 1/0; +COMMIT; + +CREATE USER other_db_user8; + +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :master_port +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8; + +-- Make sure non-superuser roles cannot use internal GUCs +-- but they can still create a role +CREATE USER nonsuperuser CREATEROLE; +GRANT ALL ON SCHEMA citus_internal TO nonsuperuser; +SET ROLE nonsuperuser; +SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres'); + +\c other_db1 +CREATE USER other_db_user9; + +RESET ROLE; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :master_port +REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser; +DROP USER other_db_user9, nonsuperuser; + +-- test from a worker +\c - - - :worker_1_port + +CREATE DATABASE other_db2; + +\c other_db2 + +CREATE USER worker_user1; + +BEGIN; +CREATE USER worker_user2; +COMMIT; + +BEGIN; +CREATE USER worker_user3; +ROLLBACK; + +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + +\c - - - :master_port +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS worker_user1, worker_user2, worker_user3; + +\c - - - :worker_1_port +DROP DATABASE other_db2; +\c - - - :master_port + +DROP SCHEMA other_databases; +DROP DATABASE other_db1; From 181b8ab6d5b8917db23ac69aca66fc11e432528a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCrkan=20=C4=B0ndibay?= Date: Tue, 26 Dec 2023 14:55:04 +0300 Subject: [PATCH 2/3] Adds additional alter database propagation support (#7253) DESCRIPTION: Adds database connection limit, rename and set tablespace propagation In this PR, below statement propagations are added alter database with allow_connections = ; alter database rename to ; alter database set TABLESPACE --------- Co-authored-by: Jelte Fennema-Nio Co-authored-by: Jelte Fennema-Nio Co-authored-by: Onur Tirtir --- src/backend/distributed/commands/database.c | 75 +++++++++++- .../commands/distribute_object_ops.c | 15 +++ .../deparser/deparse_database_stmts.c | 115 ++++++++++++------ src/include/distributed/commands.h | 2 + src/include/distributed/deparser.h | 1 + .../expected/alter_database_propagation.out | 110 ++++++++++++++--- .../create_drop_database_propagation.out | 14 +-- .../sql/alter_database_propagation.sql | 72 +++++++++-- .../sql/create_drop_database_propagation.sql | 8 +- 9 files changed, 324 insertions(+), 88 deletions(-) diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index db0fdb8c4..5bd84fb9c 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -192,6 +192,25 @@ PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString, } +/* + * IsSetTablespaceStatement returns true if the statement is a SET TABLESPACE statement, + * false otherwise. + */ +static bool +IsSetTablespaceStatement(AlterDatabaseStmt *stmt) +{ + DefElem *def = NULL; + foreach_ptr(def, stmt->options) + { + if (strcmp(def->defname, "tablespace") == 0) + { + return true; + } + } + return false; +} + + /* * PreprocessAlterDatabaseStmt is executed before the statement is applied to the local * postgres instance. @@ -203,22 +222,38 @@ List * PreprocessAlterDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext) { - if (!ShouldPropagate()) + bool missingOk = false; + AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node); + ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname, + missingOk); + + if (!ShouldPropagate() || !IsAnyObjectDistributed(list_make1(dbAddress))) { return NIL; } - AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node); - EnsureCoordinator(); char *sql = DeparseTreeNode((Node *) stmt); List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) sql, + sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + if (IsSetTablespaceStatement(stmt)) + { + /* + * Set tablespace does not work inside a transaction.Therefore, we need to use + * NontransactionalNodeDDLTask to run the command on the workers outside + * the transaction block. + */ + + return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands); + } + else + { + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + } } @@ -256,6 +291,36 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString, #endif +/* + * PreprocessAlterDatabaseRenameStmt is executed before the statement is applied to the local + * postgres instance. In this stage we prepare ALTER DATABASE RENAME statement to be run on + * all workers. + */ +List * +PostprocessAlterDatabaseRenameStmt(Node *node, const char *queryString) +{ + bool missingOk = false; + RenameStmt *stmt = castNode(RenameStmt, node); + ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->newname, + missingOk); + + if (!ShouldPropagate() || !IsAnyObjectDistributed(list_make1(dbAddress))) + { + return NIL; + } + + EnsureCoordinator(); + + char *sql = DeparseTreeNode((Node *) stmt); + + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} + + /* * PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local * postgres instance. diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 169fc6444..5bf23c92f 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -522,6 +522,16 @@ static DistributeObjectOps Database_Set = { .markDistributed = false, }; +static DistributeObjectOps Database_Rename = { + .deparse = DeparseAlterDatabaseRenameStmt, + .qualify = NULL, + .preprocess = NULL, + .postprocess = PostprocessAlterDatabaseRenameStmt, + .objectType = OBJECT_DATABASE, + .operationType = DIST_OPS_ALTER, + .address = NULL, + .markDistributed = false, +}; static DistributeObjectOps Domain_Alter = { .deparse = DeparseAlterDomainStmt, @@ -2087,6 +2097,11 @@ GetDistributeObjectOps(Node *node) return &Collation_Rename; } + case OBJECT_DATABASE: + { + return &Database_Rename; + } + case OBJECT_DOMAIN: { return &Domain_Rename; diff --git a/src/backend/distributed/deparser/deparse_database_stmts.c b/src/backend/distributed/deparser/deparse_database_stmts.c index 3614ba797..30ac3f32c 100644 --- a/src/backend/distributed/deparser/deparse_database_stmts.c +++ b/src/backend/distributed/deparser/deparse_database_stmts.c @@ -30,12 +30,14 @@ static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt); static void AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt); static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt); -static void AppendDefElemConnLimit(StringInfo buf, DefElem *def); static void AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt); static void AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt); static void AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt); +static void AppendBasicAlterDatabaseOptions(StringInfo buf, AlterDatabaseStmt *stmt); +static void AppendGrantDatabases(StringInfo buf, GrantStmt *stmt); +static void AppendAlterDatabaseSetTablespace(StringInfo buf, DefElem *def, char *dbname); -const DefElemOptionFormat create_database_option_formats[] = { +const DefElemOptionFormat createDatabaseOptionFormats[] = { { "owner", " OWNER %s", OPTION_FORMAT_STRING }, { "template", " TEMPLATE %s", OPTION_FORMAT_STRING }, { "encoding", " ENCODING %s", OPTION_FORMAT_LITERAL_CSTR }, @@ -53,6 +55,14 @@ const DefElemOptionFormat create_database_option_formats[] = { { "is_template", " IS_TEMPLATE %s", OPTION_FORMAT_BOOLEAN } }; + +const DefElemOptionFormat alterDatabaseOptionFormats[] = { + { "is_template", " IS_TEMPLATE %s", OPTION_FORMAT_BOOLEAN }, + { "allow_connections", " ALLOW_CONNECTIONS %s", OPTION_FORMAT_BOOLEAN }, + { "connection_limit", " CONNECTION LIMIT %d", OPTION_FORMAT_INTEGER }, +}; + + char * DeparseAlterDatabaseOwnerStmt(Node *node) { @@ -112,48 +122,63 @@ AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt) static void -AppendDefElemConnLimit(StringInfo buf, DefElem *def) +AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt) { - appendStringInfo(buf, " CONNECTION LIMIT %ld", (long int) defGetNumeric(def)); + if (list_length(stmt->options) == 0) + { + elog(ERROR, "got unexpected number of options for ALTER DATABASE"); + } + + if (stmt->options) + { + DefElem *firstOption = linitial(stmt->options); + if (strcmp(firstOption->defname, "tablespace") == 0) + { + AppendAlterDatabaseSetTablespace(buf, firstOption, stmt->dbname); + + /* SET tablespace cannot be combined with other options */ + return; + } + + + appendStringInfo(buf, "ALTER DATABASE %s WITH", + quote_identifier(stmt->dbname)); + + AppendBasicAlterDatabaseOptions(buf, stmt); + } + + appendStringInfo(buf, ";"); } static void -AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt) +AppendAlterDatabaseSetTablespace(StringInfo buf, DefElem *def, char *dbname) { - appendStringInfo(buf, "ALTER DATABASE %s ", quote_identifier(stmt->dbname)); + appendStringInfo(buf, + "ALTER DATABASE %s SET TABLESPACE %s", + quote_identifier(dbname), quote_identifier(defGetString(def))); +} - if (stmt->options) + +/* + * AppendBasicAlterDatabaseOptions appends basic ALTER DATABASE options to a string buffer. + * Basic options are those that can be appended to the ALTER DATABASE statement + * after the "WITH" keyword.(i.e. ALLOW_CONNECTIONS, CONNECTION LIMIT, IS_TEMPLATE) + * For example, the tablespace option is not a basic option since it is defined via SET keyword. + * + * This function takes a string buffer and an AlterDatabaseStmt as input. + * It appends the basic options to the string buffer. + * + */ +static void +AppendBasicAlterDatabaseOptions(StringInfo buf, AlterDatabaseStmt *stmt) +{ + DefElem *def = NULL; + foreach_ptr(def, stmt->options) { - ListCell *cell = NULL; - appendStringInfo(buf, "WITH "); - foreach(cell, stmt->options) - { - DefElem *def = castNode(DefElem, lfirst(cell)); - if (strcmp(def->defname, "is_template") == 0) - { - appendStringInfo(buf, "IS_TEMPLATE %s", - quote_literal_cstr(strVal(def->arg))); - } - else if (strcmp(def->defname, "connection_limit") == 0) - { - AppendDefElemConnLimit(buf, def); - } - else if (strcmp(def->defname, "allow_connections") == 0) - { - ereport(ERROR, - errmsg("ALLOW_CONNECTIONS is not supported")); - } - else - { - ereport(ERROR, - errmsg("unrecognized ALTER DATABASE option: %s", - def->defname)); - } - } + DefElemOptionToStatement(buf, def, alterDatabaseOptionFormats, lengthof( + alterDatabaseOptionFormats)); } - - appendStringInfo(buf, ";"); } @@ -216,6 +241,22 @@ AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt) } +char * +DeparseAlterDatabaseRenameStmt(Node *node) +{ + RenameStmt *stmt = (RenameStmt *) node; + + StringInfoData str; + initStringInfo(&str); + + appendStringInfo(&str, "ALTER DATABASE %s RENAME TO %s", + quote_identifier(stmt->subname), + quote_identifier(stmt->newname)); + + return str.data; +} + + char * DeparseAlterDatabaseSetStmt(Node *node) { @@ -246,8 +287,8 @@ AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt) DefElem *option = NULL; foreach_ptr(option, stmt->options) { - DefElemOptionToStatement(buf, option, create_database_option_formats, - lengthof(create_database_option_formats)); + DefElemOptionToStatement(buf, option, createDatabaseOptionFormats, + lengthof(createDatabaseOptionFormats)); } } diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 8b6e9001a..2bd837d44 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -244,6 +244,8 @@ extern List * DropDatabaseStmtObjectAddress(Node *node, bool missingOk, bool isPostprocess); extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missingOk, bool isPostprocess); +extern List * GenerateGrantDatabaseCommandList(void); +extern List * PostprocessAlterDatabaseRenameStmt(Node *node, const char *queryString); extern void EnsureSupportedCreateDatabaseCommand(CreatedbStmt *stmt); extern char * CreateDatabaseDDLCommand(Oid dbId); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index d66bdb933..e88372801 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -251,6 +251,7 @@ extern char * DeparseAlterDatabaseRefreshCollStmt(Node *node); extern char * DeparseAlterDatabaseSetStmt(Node *node); extern char * DeparseCreateDatabaseStmt(Node *node); extern char * DeparseDropDatabaseStmt(Node *node); +extern char * DeparseAlterDatabaseRenameStmt(Node *node); /* forward declaration for deparse_publication_stmts.c */ diff --git a/src/test/regress/expected/alter_database_propagation.out b/src/test/regress/expected/alter_database_propagation.out index 0ce217749..1a56f1338 100644 --- a/src/test/regress/expected/alter_database_propagation.out +++ b/src/test/regress/expected/alter_database_propagation.out @@ -1,38 +1,30 @@ set citus.log_remote_commands = true; set citus.grep_remote_commands = '%ALTER DATABASE%'; --- since ALLOW_CONNECTIONS alter option should be executed in a different database --- and since we don't have a multiple database support for now, --- this statement will get error -alter database regression ALLOW_CONNECTIONS false; -ERROR: ALLOW_CONNECTIONS is not supported alter database regression with CONNECTION LIMIT 100; -NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100; +NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100; +NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx alter database regression with IS_TEMPLATE true CONNECTION LIMIT 50; -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true' CONNECTION LIMIT 50; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true CONNECTION LIMIT 50; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true' CONNECTION LIMIT 50; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true CONNECTION LIMIT 50; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx alter database regression with CONNECTION LIMIT -1; -NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1; +NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1; +NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx alter database regression with IS_TEMPLATE true; -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true'; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true'; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx alter database regression with IS_TEMPLATE false; -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'false'; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE false; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'false'; +NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE false; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx --- this statement will get error since we don't have a multiple database support for now -alter database regression rename to regression2; -ERROR: current database cannot be renamed alter database regression set default_transaction_read_only = true; NOTICE: issuing ALTER DATABASE regression SET default_transaction_read_only = 'true' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -147,4 +139,86 @@ NOTICE: issuing ALTER DATABASE regression RESET lock_timeout DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing ALTER DATABASE regression RESET lock_timeout DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +set citus.enable_create_database_propagation=on; +create database "regression!'2"; +alter database "regression!'2" with CONNECTION LIMIT 100; +NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50; +NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE true CONNECTION LIMIT 50; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE true CONNECTION LIMIT 50; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +alter database "regression!'2" with IS_TEMPLATE false; +NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE false; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE false; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts3' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; +\c - - - :worker_1_port +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts4' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; +\c - - - :worker_2_port +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts5' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; +\c - - - :master_port +set citus.log_remote_commands = true; +set citus.grep_remote_commands = '%ALTER DATABASE%'; +alter database "regression!'2" set TABLESPACE alter_db_tablespace; +NOTICE: issuing ALTER DATABASE "regression!'2" SET TABLESPACE alter_db_tablespace +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'2" SET TABLESPACE alter_db_tablespace +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +set citus.enable_create_database_propagation=on; +alter database "regression!'2" rename to regression3; +NOTICE: issuing ALTER DATABASE "regression!'2" RENAME TO regression3 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'2" RENAME TO regression3 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +-- check that the local database rename and alter comnmand is not propagated +set citus.enable_create_database_propagation=off; +CREATE database local_regression; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +alter DATABASE local_regression with CONNECTION LIMIT 100; +alter DATABASE local_regression rename to local_regression2; +drop database local_regression2; +set citus.enable_create_database_propagation=on; +drop database regression3; +create database "regression!'4"; +SELECT result FROM run_command_on_all_nodes( + $$ + ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape" + $$ +); + result +--------------------------------------------------------------------- + ALTER TABLESPACE + ALTER TABLESPACE + ALTER TABLESPACE +(3 rows) + +alter database "regression!'4" set TABLESPACE "ts-needs\!escape"; +NOTICE: issuing ALTER DATABASE "regression!'4" SET TABLESPACE "ts-needs\!escape" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER DATABASE "regression!'4" SET TABLESPACE "ts-needs\!escape" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +drop database "regression!'4"; set citus.log_remote_commands = false; +set citus.enable_create_database_propagation=off; +SELECT result FROM run_command_on_all_nodes( + $$ + drop tablespace "ts-needs\!escape" + $$ +); + result +--------------------------------------------------------------------- + DROP TABLESPACE + DROP TABLESPACE + DROP TABLESPACE +(3 rows) + diff --git a/src/test/regress/expected/create_drop_database_propagation.out b/src/test/regress/expected/create_drop_database_propagation.out index e0172f3e8..eb637f8c2 100644 --- a/src/test/regress/expected/create_drop_database_propagation.out +++ b/src/test/regress/expected/create_drop_database_propagation.out @@ -209,19 +209,7 @@ SELECT result FROM run_command_on_all_nodes( CREATE USER "role-needs\!escape"; CREATE DATABASE "db-needs\!escape" owner "role-needs\!escape" tablespace "ts-needs\!escape"; -- Rename it to make check_database_on_all_nodes happy. --- Today we don't support ALTER DATABASE .. RENAME TO .., so need to propagate it manually. -SELECT result FROM run_command_on_all_nodes( - $$ - ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape - $$ -); - result ---------------------------------------------------------------------- - ALTER DATABASE - ALTER DATABASE - ALTER DATABASE -(3 rows) - +ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape; SELECT * FROM public.check_database_on_all_nodes('db_needs_escape') ORDER BY node_type; node_type | result --------------------------------------------------------------------- diff --git a/src/test/regress/sql/alter_database_propagation.sql b/src/test/regress/sql/alter_database_propagation.sql index 2b9d3ac33..4904919a6 100644 --- a/src/test/regress/sql/alter_database_propagation.sql +++ b/src/test/regress/sql/alter_database_propagation.sql @@ -1,20 +1,12 @@ set citus.log_remote_commands = true; set citus.grep_remote_commands = '%ALTER DATABASE%'; - --- since ALLOW_CONNECTIONS alter option should be executed in a different database --- and since we don't have a multiple database support for now, --- this statement will get error -alter database regression ALLOW_CONNECTIONS false; - - alter database regression with CONNECTION LIMIT 100; alter database regression with IS_TEMPLATE true CONNECTION LIMIT 50; alter database regression with CONNECTION LIMIT -1; alter database regression with IS_TEMPLATE true; alter database regression with IS_TEMPLATE false; --- this statement will get error since we don't have a multiple database support for now -alter database regression rename to regression2; + alter database regression set default_transaction_read_only = true; @@ -56,4 +48,66 @@ alter database regression set lock_timeout from current; alter database regression set lock_timeout to DEFAULT; alter database regression RESET lock_timeout; +set citus.enable_create_database_propagation=on; +create database "regression!'2"; +alter database "regression!'2" with CONNECTION LIMIT 100; +alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50; +alter database "regression!'2" with IS_TEMPLATE false; + + + + +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts3' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; + +\c - - - :worker_1_port +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts4' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; + +\c - - - :worker_2_port +\set alter_db_tablespace :abs_srcdir '/tmp_check/ts5' +CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace'; + +\c - - - :master_port + +set citus.log_remote_commands = true; +set citus.grep_remote_commands = '%ALTER DATABASE%'; + +alter database "regression!'2" set TABLESPACE alter_db_tablespace; + +set citus.enable_create_database_propagation=on; +alter database "regression!'2" rename to regression3; + +-- check that the local database rename and alter comnmand is not propagated +set citus.enable_create_database_propagation=off; +CREATE database local_regression; + +alter DATABASE local_regression with CONNECTION LIMIT 100; +alter DATABASE local_regression rename to local_regression2; +drop database local_regression2; + +set citus.enable_create_database_propagation=on; + +drop database regression3; + +create database "regression!'4"; + + +SELECT result FROM run_command_on_all_nodes( + $$ + ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape" + $$ +); + +alter database "regression!'4" set TABLESPACE "ts-needs\!escape"; + +drop database "regression!'4"; + set citus.log_remote_commands = false; +set citus.enable_create_database_propagation=off; + +SELECT result FROM run_command_on_all_nodes( + $$ + drop tablespace "ts-needs\!escape" + $$ +); diff --git a/src/test/regress/sql/create_drop_database_propagation.sql b/src/test/regress/sql/create_drop_database_propagation.sql index c83548d68..c71841eee 100644 --- a/src/test/regress/sql/create_drop_database_propagation.sql +++ b/src/test/regress/sql/create_drop_database_propagation.sql @@ -129,13 +129,8 @@ CREATE USER "role-needs\!escape"; CREATE DATABASE "db-needs\!escape" owner "role-needs\!escape" tablespace "ts-needs\!escape"; -- Rename it to make check_database_on_all_nodes happy. --- Today we don't support ALTER DATABASE .. RENAME TO .., so need to propagate it manually. -SELECT result FROM run_command_on_all_nodes( - $$ - ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape - $$ -); +ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape; SELECT * FROM public.check_database_on_all_nodes('db_needs_escape') ORDER BY node_type; -- test database syncing after node addition @@ -541,6 +536,7 @@ REVOKE CONNECT ON DATABASE test_db FROM propagated_role; DROP DATABASE test_db; DROP ROLE propagated_role, non_propagated_role; + --clean up resources created by this test -- DROP TABLESPACE is not supported, so we need to drop it manually. From c3579eef0626563f62c42f12629af8a4dfcb000f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCrkan=20=C4=B0ndibay?= Date: Thu, 28 Dec 2023 15:15:58 +0300 Subject: [PATCH 3/3] Adds REASSIGN OWNED BY propagation (#7319) DESCRIPTION: Adds REASSIGN OWNED BY propagation This pull request introduces the propagation of the "Reassign owned by" statement. It accommodates both local and distributed roles for both the old and new assignments. However, when the old role is a local role, it undergoes filtering and is not propagated. On the other hand, if the new role is a local role, the process involves first creating the role on worker nodes before propagating the "Reassign owned" statement. --- .../distributed/commands/dependencies.c | 156 +++++++++++--- .../commands/distribute_object_ops.c | 16 ++ src/backend/distributed/commands/owned.c | 81 ++++++++ .../deparser/deparse_owned_stmts.c | 26 ++- .../transaction/transaction_management.c | 13 +- src/include/distributed/commands.h | 1 + src/include/distributed/deparser.h | 1 + src/include/distributed/metadata_utility.h | 1 + .../distributed/transaction_management.h | 2 +- .../citus_schema_distribute_undistribute.out | 18 +- .../regress/expected/citus_schema_move.out | 9 +- src/test/regress/expected/reassign_owned.out | 194 ++++++++++++++++++ src/test/regress/multi_1_schedule | 1 + .../citus_schema_distribute_undistribute.sql | 4 +- src/test/regress/sql/citus_schema_move.sql | 2 +- src/test/regress/sql/reassign_owned.sql | 141 +++++++++++++ 16 files changed, 605 insertions(+), 61 deletions(-) create mode 100644 src/test/regress/expected/reassign_owned.out create mode 100644 src/test/regress/sql/reassign_owned.sql diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index d51678c04..3b3b3cfd6 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -31,20 +31,90 @@ #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +typedef enum RequiredObjectSet +{ + REQUIRE_ONLY_DEPENDENCIES = 1, + REQUIRE_OBJECT_AND_DEPENDENCIES = 2, +} RequiredObjectSet; + static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress); static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress); static int ObjectAddressComparator(const void *a, const void *b); static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); +static void EnsureRequiredObjectSetExistOnAllNodes(const ObjectAddress *target, + RequiredObjectSet requiredObjectSet); static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency); static bool ShouldPropagateObject(const ObjectAddress *address); static char * DropTableIfExistsCommand(Oid relationId); /* - * EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes - * sure these are available on all nodes. If not available they will be created on the - * nodes via a separate session that will be committed directly so that the objects are - * visible to potentially multiple sessions creating the shards. + * EnsureObjectAndDependenciesExistOnAllNodes is a wrapper around + * EnsureRequiredObjectSetExistOnAllNodes to ensure the "object itself" (together + * with its dependencies) is available on all nodes. + * + * Different than EnsureDependenciesExistOnAllNodes, we return early if the + * target object is distributed already. + * + * The reason why we don't do the same in EnsureDependenciesExistOnAllNodes + * is that it's is used when altering an object too and hence the target object + * may instantly have a dependency that needs to be propagated now. For example, + * when "⁠GRANT non_dist_role TO dist_role" is executed, we need to propagate + * "non_dist_role" to all nodes before propagating the "GRANT" command itself. + * For this reason, we call EnsureDependenciesExistOnAllNodes for "dist_role" + * and it would automatically discover that "non_dist_role" is a dependency of + * "dist_role" and propagate it beforehand. + * + * However, when we're requested to create the target object itself (and + * implicitly its dependencies), we're sure that we're not altering the target + * object itself, hence we can return early if the target object is already + * distributed. This is the case, for example, when + * "REASSIGN OWNED BY dist_role TO non_dist_role" is executed. In that case, + * "non_dist_role" is not a dependency of "dist_role" but we want to distribute + * "non_dist_role" beforehand and we call this function for "non_dist_role", + * not for "dist_role". + * + * See EnsureRequiredObjectExistOnAllNodes to learn more about how this + * function deals with an object created within the same transaction. + */ +void +EnsureObjectAndDependenciesExistOnAllNodes(const ObjectAddress *target) +{ + if (IsAnyObjectDistributed(list_make1((ObjectAddress *) target))) + { + return; + } + EnsureRequiredObjectSetExistOnAllNodes(target, REQUIRE_OBJECT_AND_DEPENDENCIES); +} + + +/* + * EnsureDependenciesExistOnAllNodes is a wrapper around + * EnsureRequiredObjectSetExistOnAllNodes to ensure "all dependencies" of given + * object --but not the object itself-- are available on all nodes. + * + * See EnsureRequiredObjectSetExistOnAllNodes to learn more about how this + * function deals with an object created within the same transaction. + */ +static void +EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) +{ + EnsureRequiredObjectSetExistOnAllNodes(target, REQUIRE_ONLY_DEPENDENCIES); +} + + +/* + * EnsureRequiredObjectSetExistOnAllNodes finds all the dependencies that we support and makes + * sure these are available on all nodes if required object set is REQUIRE_ONLY_DEPENDENCIES. + * Otherwise, i.e., if required object set is REQUIRE_OBJECT_AND_DEPENDENCIES, then this + * function creates the object itself on all nodes too. This function ensures that each + * of the dependencies are supported by Citus but doesn't check the same for the target + * object itself (when REQUIRE_OBJECT_AND_DEPENDENCIES) is provided because we assume that + * callers don't call this function for an unsupported function at all. + * + * If not available, they will be created on the nodes via a separate session that will be + * committed directly so that the objects are visible to potentially multiple sessions creating + * the shards. * * Note; only the actual objects are created via a separate session, the records to * pg_dist_object are created in this session. As a side effect the objects could be @@ -55,29 +125,52 @@ static char * DropTableIfExistsCommand(Oid relationId); * postgres native CREATE IF NOT EXISTS, or citus helper functions. */ static void -EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) +EnsureRequiredObjectSetExistOnAllNodes(const ObjectAddress *target, + RequiredObjectSet requiredObjectSet) { - List *dependenciesWithCommands = NIL; + Assert(requiredObjectSet == REQUIRE_ONLY_DEPENDENCIES || + requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES); + + + List *objectsWithCommands = NIL; List *ddlCommands = NULL; /* * If there is any unsupported dependency or circular dependency exists, Citus can * not ensure dependencies will exist on all nodes. + * + * Note that we don't check whether "target" is distributable (in case + * REQUIRE_OBJECT_AND_DEPENDENCIES is provided) because we expect callers + * to not even call this function if Citus doesn't know how to propagate + * "target" object itself. */ EnsureDependenciesCanBeDistributed(target); /* collect all dependencies in creation order and get their ddl commands */ - List *dependencies = GetDependenciesForObject(target); - ObjectAddress *dependency = NULL; - foreach_ptr(dependency, dependencies) + List *objectsToBeCreated = GetDependenciesForObject(target); + + /* + * Append the target object to make sure that it's created after its + * dependencies are created, if requested. + */ + if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES) { - List *dependencyCommands = GetDependencyCreateDDLCommands(dependency); + ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress)); + *targetCopy = *target; + + objectsToBeCreated = lappend(objectsToBeCreated, targetCopy); + } + + ObjectAddress *object = NULL; + foreach_ptr(object, objectsToBeCreated) + { + List *dependencyCommands = GetDependencyCreateDDLCommands(object); ddlCommands = list_concat(ddlCommands, dependencyCommands); - /* create a new list with dependencies that actually created commands */ + /* create a new list with objects that actually created commands */ if (list_length(dependencyCommands) > 0) { - dependenciesWithCommands = lappend(dependenciesWithCommands, dependency); + objectsWithCommands = lappend(objectsWithCommands, object); } } if (list_length(ddlCommands) <= 0) @@ -100,26 +193,28 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) List *remoteNodeList = ActivePrimaryRemoteNodeList(RowShareLock); /* - * Lock dependent objects explicitly to make sure same DDL command won't be sent + * Lock objects to be created explicitly to make sure same DDL command won't be sent * multiple times from parallel sessions. * - * Sort dependencies that will be created on workers to not to have any deadlock + * Sort the objects that will be created on workers to not to have any deadlock * issue if different sessions are creating different objects. */ - List *addressSortedDependencies = SortList(dependenciesWithCommands, + List *addressSortedDependencies = SortList(objectsWithCommands, ObjectAddressComparator); - foreach_ptr(dependency, addressSortedDependencies) + foreach_ptr(object, addressSortedDependencies) { - LockDatabaseObject(dependency->classId, dependency->objectId, - dependency->objectSubId, ExclusiveLock); + LockDatabaseObject(object->classId, object->objectId, + object->objectSubId, ExclusiveLock); } /* - * We need to propagate dependencies via the current user's metadata connection if - * any dependency for the target is created in the current transaction. Our assumption - * is that if we rely on a dependency created in the current transaction, then the - * current user, most probably, has permissions to create the target object as well. + * We need to propagate objects via the current user's metadata connection if + * any of the objects that we're interested in are created in the current transaction. + * Our assumption is that if we rely on an object created in the current transaction, + * then the current user, most probably, has permissions to create the target object + * as well. + * * Note that, user still may not be able to create the target due to no permissions * for any of its dependencies. But this is ok since it should be rare. * @@ -127,7 +222,18 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) * have visibility issues since propagated dependencies would be invisible to * the separate connection until we locally commit. */ - if (HasAnyDependencyInPropagatedObjects(target)) + List *createdObjectList = GetAllSupportedDependenciesForObject(target); + + /* consider target as well if we're requested to create it too */ + if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES) + { + ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress)); + *targetCopy = *target; + + createdObjectList = lappend(createdObjectList, targetCopy); + } + + if (HasAnyObjectInPropagatedObjects(createdObjectList)) { SendCommandListToRemoteNodesWithMetadata(ddlCommands); } @@ -150,7 +256,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) * that objects have been created on remote nodes before marking them * distributed, so MarkObjectDistributed wouldn't fail. */ - foreach_ptr(dependency, dependenciesWithCommands) + foreach_ptr(object, objectsWithCommands) { /* * pg_dist_object entries must be propagated with the super user, since @@ -160,7 +266,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) * Only dependent object's metadata should be propagated with super user. * Metadata of the table itself must be propagated with the current user. */ - MarkObjectDistributedViaSuperUser(dependency); + MarkObjectDistributedViaSuperUser(object); } } diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 5bf23c92f..eb454d70d 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -275,6 +275,17 @@ static DistributeObjectOps Any_CreateRole = { .address = CreateRoleStmtObjectAddress, .markDistributed = true, }; + +static DistributeObjectOps Any_ReassignOwned = { + .deparse = DeparseReassignOwnedStmt, + .qualify = NULL, + .preprocess = NULL, + .postprocess = PostprocessReassignOwnedStmt, + .operationType = DIST_OPS_ALTER, + .address = NULL, + .markDistributed = false, +}; + static DistributeObjectOps Any_DropOwned = { .deparse = DeparseDropOwnedStmt, .qualify = NULL, @@ -1878,6 +1889,11 @@ GetDistributeObjectOps(Node *node) return &Any_DropOwned; } + case T_ReassignOwnedStmt: + { + return &Any_ReassignOwned; + } + case T_DropStmt: { DropStmt *stmt = castNode(DropStmt, node); diff --git a/src/backend/distributed/commands/owned.c b/src/backend/distributed/commands/owned.c index 3b4b043f8..30374ce26 100644 --- a/src/backend/distributed/commands/owned.c +++ b/src/backend/distributed/commands/owned.c @@ -48,6 +48,9 @@ #include "distributed/version_compat.h" #include "distributed/worker_transaction.h" + +static ObjectAddress * GetNewRoleAddress(ReassignOwnedStmt *stmt); + /* * PreprocessDropOwnedStmt finds the distributed role out of the ones * being dropped and unmarks them distributed and creates the drop statements @@ -89,3 +92,81 @@ PreprocessDropOwnedStmt(Node *node, const char *queryString, return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } + + +/* + * PostprocessReassignOwnedStmt takes a Node pointer representing a REASSIGN + * OWNED statement and performs any necessary post-processing after the statement + * has been executed locally. + * + * We filter out local roles in OWNED BY clause before deparsing the command, + * meaning that we skip reassigning what is owned by local roles. However, + * if the role specified in TO clause is local, we automatically distribute + * it before deparsing the command. + */ +List * +PostprocessReassignOwnedStmt(Node *node, const char *queryString) +{ + ReassignOwnedStmt *stmt = castNode(ReassignOwnedStmt, node); + List *allReassignRoles = stmt->roles; + + List *distributedReassignRoles = FilterDistributedRoles(allReassignRoles); + + if (list_length(distributedReassignRoles) <= 0) + { + return NIL; + } + + if (!ShouldPropagate()) + { + return NIL; + } + + EnsureCoordinator(); + + stmt->roles = distributedReassignRoles; + char *sql = DeparseTreeNode((Node *) stmt); + stmt->roles = allReassignRoles; + + ObjectAddress *newRoleAddress = GetNewRoleAddress(stmt); + + /* + * We temporarily enable create / alter role propagation to properly + * propagate the role specified in TO clause. + */ + int saveNestLevel = NewGUCNestLevel(); + set_config_option("citus.enable_create_role_propagation", "on", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + set_config_option("citus.enable_alter_role_propagation", "on", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + set_config_option("citus.enable_alter_role_set_propagation", "on", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + EnsureObjectAndDependenciesExistOnAllNodes(newRoleAddress); + + /* rollback GUCs to the state before this session */ + AtEOXact_GUC(true, saveNestLevel); + + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} + + +/* + * GetNewRoleAddress returns the ObjectAddress of the new role + */ +static ObjectAddress * +GetNewRoleAddress(ReassignOwnedStmt *stmt) +{ + Oid roleOid = get_role_oid(stmt->newrole->rolename, false); + ObjectAddress *address = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*address, AuthIdRelationId, roleOid); + return address; +} diff --git a/src/backend/distributed/deparser/deparse_owned_stmts.c b/src/backend/distributed/deparser/deparse_owned_stmts.c index af7fa0968..93572a4ee 100644 --- a/src/backend/distributed/deparser/deparse_owned_stmts.c +++ b/src/backend/distributed/deparser/deparse_owned_stmts.c @@ -71,7 +71,7 @@ AppendRoleList(StringInfo buf, List *roleList) { Node *roleNode = (Node *) lfirst(cell); Assert(IsA(roleNode, RoleSpec) || IsA(roleNode, AccessPriv)); - char const *rolename = NULL; + const char *rolename = NULL; if (IsA(roleNode, RoleSpec)) { rolename = RoleSpecString((RoleSpec *) roleNode, true); @@ -83,3 +83,27 @@ AppendRoleList(StringInfo buf, List *roleList) } } } + + +static void +AppendReassignOwnedStmt(StringInfo buf, ReassignOwnedStmt *stmt) +{ + appendStringInfo(buf, "REASSIGN OWNED BY "); + + AppendRoleList(buf, stmt->roles); + const char *newRoleName = RoleSpecString(stmt->newrole, true); + appendStringInfo(buf, " TO %s", newRoleName); +} + + +char * +DeparseReassignOwnedStmt(Node *node) +{ + ReassignOwnedStmt *stmt = castNode(ReassignOwnedStmt, node); + StringInfoData buf = { 0 }; + initStringInfo(&buf); + + AppendReassignOwnedStmt(&buf, stmt); + + return buf.data; +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 8b3333639..29f5b367e 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -1171,18 +1171,17 @@ ResetPropagatedObjects(void) /* - * HasAnyDependencyInPropagatedObjects decides if any dependency of given object is + * HasAnyObjectInPropagatedObjects decides if any of the objects in given list are * propagated in the current transaction. */ bool -HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress) +HasAnyObjectInPropagatedObjects(List *objectList) { - List *dependencyList = GetAllSupportedDependenciesForObject(objectAddress); - ObjectAddress *dependency = NULL; - foreach_ptr(dependency, dependencyList) + ObjectAddress *object = NULL; + foreach_ptr(object, objectList) { /* first search in root transaction */ - if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, dependency)) + if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, object)) { return true; } @@ -1195,7 +1194,7 @@ HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress) SubXactContext *state = NULL; foreach_ptr(state, activeSubXactContexts) { - if (DependencyInPropagatedObjectsHash(state->propagatedObjects, dependency)) + if (DependencyInPropagatedObjectsHash(state->propagatedObjects, object)) { return true; } diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 2bd837d44..99bf81843 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -442,6 +442,7 @@ extern List * CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok, bool /* owned.c - forward declarations */ extern List * PreprocessDropOwnedStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); +extern List * PostprocessReassignOwnedStmt(Node *node, const char *queryString); /* policy.c - forward declarations */ extern List * CreatePolicyCommands(Oid relationId); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index e88372801..22636b401 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -231,6 +231,7 @@ extern void QualifyAlterRoleSetStmt(Node *stmt); extern char * DeparseCreateRoleStmt(Node *stmt); extern char * DeparseDropRoleStmt(Node *stmt); extern char * DeparseGrantRoleStmt(Node *stmt); +extern char * DeparseReassignOwnedStmt(Node *node); /* forward declarations for deparse_owned_stmts.c */ extern char * DeparseDropOwnedStmt(Node *node); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 04a4b500b..737e1283b 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -386,6 +386,7 @@ extern void EnsureUndistributeTenantTableSafe(Oid relationId, const char *operat extern TableConversionReturn * UndistributeTable(TableConversionParameters *params); extern void UndistributeTables(List *relationIdList); +extern void EnsureObjectAndDependenciesExistOnAllNodes(const ObjectAddress *target); extern void EnsureAllObjectDependenciesExistOnAllNodes(const List *targets); extern DeferredErrorMessage * DeferErrorIfCircularDependencyExists(const ObjectAddress * diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index fa762682b..ee3153d10 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -163,7 +163,7 @@ extern bool MaybeExecutingUDF(void); extern void TrackPropagatedObject(const ObjectAddress *objectAddress); extern void TrackPropagatedTableAndSequences(Oid relationId); extern void ResetPropagatedObjects(void); -extern bool HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress); +extern bool HasAnyObjectInPropagatedObjects(List *objectList); /* initialization function(s) */ extern void InitializeTransactionManagement(void); diff --git a/src/test/regress/expected/citus_schema_distribute_undistribute.out b/src/test/regress/expected/citus_schema_distribute_undistribute.out index ae08b6c6a..352fc776b 100644 --- a/src/test/regress/expected/citus_schema_distribute_undistribute.out +++ b/src/test/regress/expected/citus_schema_distribute_undistribute.out @@ -285,14 +285,7 @@ SELECT citus_schema_undistribute('tenant1'); ERROR: must be owner of schema tenant1 -- assign all tables to dummyregular except table5 SET role tenantuser; -SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO dummyregular; $$); - result ---------------------------------------------------------------------- - REASSIGN OWNED - REASSIGN OWNED - REASSIGN OWNED -(3 rows) - +REASSIGN OWNED BY tenantuser TO dummyregular; CREATE TABLE tenant1.table5(id int); -- table owner check fails the distribution SET role dummyregular; @@ -366,14 +359,7 @@ SELECT result FROM run_command_on_all_nodes($$ SELECT array_agg(logicalrelid ORD (3 rows) RESET role; -SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY dummyregular TO tenantuser; $$); - result ---------------------------------------------------------------------- - REASSIGN OWNED - REASSIGN OWNED - REASSIGN OWNED -(3 rows) - +REASSIGN OWNED BY dummyregular TO tenantuser; DROP USER dummyregular; CREATE USER dummysuper superuser; SET role dummysuper; diff --git a/src/test/regress/expected/citus_schema_move.out b/src/test/regress/expected/citus_schema_move.out index 160d2062b..9c25919d6 100644 --- a/src/test/regress/expected/citus_schema_move.out +++ b/src/test/regress/expected/citus_schema_move.out @@ -189,14 +189,7 @@ SELECT citus_schema_move('s2', 'dummy_node', 1234); ERROR: must be owner of schema s2 -- assign all tables to regularuser RESET ROLE; -SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO regularuser; $$); - result ---------------------------------------------------------------------- - REASSIGN OWNED - REASSIGN OWNED - REASSIGN OWNED -(3 rows) - +REASSIGN OWNED BY tenantuser TO regularuser; GRANT USAGE ON SCHEMA citus_schema_move TO regularuser; SET ROLE regularuser; SELECT nodeid AS s2_new_nodeid, quote_literal(nodename) AS s2_new_nodename, nodeport AS s2_new_nodeport diff --git a/src/test/regress/expected/reassign_owned.out b/src/test/regress/expected/reassign_owned.out new file mode 100644 index 000000000..366e6d945 --- /dev/null +++ b/src/test/regress/expected/reassign_owned.out @@ -0,0 +1,194 @@ +CREATE ROLE distributed_source_role1; +create ROLE "distributed_source_role-\!"; +CREATE ROLE "distributed_target_role1-\!"; +set citus.enable_create_role_propagation to off; +create ROLE local_target_role1; +NOTICE: not propagating CREATE ROLE/USER commands to other nodes +HINT: Connect to other nodes directly to manually create all necessary users and roles. +\c - - - :worker_1_port +set citus.enable_create_role_propagation to off; +CREATE ROLE local_target_role1; +NOTICE: not propagating CREATE ROLE/USER commands to other nodes +HINT: Connect to other nodes directly to manually create all necessary users and roles. +\c - - - :master_port +set citus.enable_create_role_propagation to off; +create role local_source_role1; +NOTICE: not propagating CREATE ROLE/USER commands to other nodes +HINT: Connect to other nodes directly to manually create all necessary users and roles. +reset citus.enable_create_role_propagation; +GRANT CREATE ON SCHEMA public TO distributed_source_role1,"distributed_source_role-\!"; +SET ROLE distributed_source_role1; +CREATE TABLE public.test_table (col1 int); +set role "distributed_source_role-\!"; +CREATE TABLE public.test_table2 (col2 int); +RESET ROLE; +select create_distributed_table('test_table', 'col1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('test_table2', 'col2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT + schemaname, + tablename, + tableowner + FROM + pg_tables + WHERE + tablename in ('test_table', 'test_table2') + ORDER BY tablename + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + [{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_source_role1"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_source_role-\\!"}] + [{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_source_role1"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_source_role-\\!"}] + [{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_source_role1"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_source_role-\\!"}] +(3 rows) + +--tests for reassing owned by with multiple distributed roles and a local role to a distributed role +--local role should be ignored +set citus.log_remote_commands to on; +set citus.grep_remote_commands = '%REASSIGN OWNED BY%'; +REASSIGN OWNED BY distributed_source_role1,"distributed_source_role-\!",local_source_role1 TO "distributed_target_role1-\!"; +NOTICE: issuing REASSIGN OWNED BY distributed_source_role1, "distributed_source_role-\!" TO "distributed_target_role1-\!" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing REASSIGN OWNED BY distributed_source_role1, "distributed_source_role-\!" TO "distributed_target_role1-\!" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +reset citus.grep_remote_commands; +reset citus.log_remote_commands; +--check if the owner changed to "distributed_target_role1-\!" +RESET citus.log_remote_commands; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT + schemaname, + tablename, + tableowner + FROM + pg_tables + WHERE + tablename in ('test_table', 'test_table2') + ORDER BY tablename + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + [{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}] + [{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}] + [{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}] +(3 rows) + +--tests for reassing owned by with multiple distributed roles and a local role to a local role +--local role should be ignored +SET ROLE distributed_source_role1; +CREATE TABLE public.test_table3 (col1 int); +set role "distributed_source_role-\!"; +CREATE TABLE public.test_table4 (col2 int); +RESET ROLE; +select create_distributed_table('test_table3', 'col1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('test_table4', 'col2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +set citus.log_remote_commands to on; +set citus.grep_remote_commands = '%REASSIGN OWNED BY%'; +set citus.enable_create_role_propagation to off; +set citus.enable_alter_role_propagation to off; +set citus.enable_alter_role_set_propagation to off; +REASSIGN OWNED BY distributed_source_role1,"distributed_source_role-\!",local_source_role1 TO local_target_role1; +NOTICE: issuing REASSIGN OWNED BY distributed_source_role1, "distributed_source_role-\!" TO local_target_role1 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing REASSIGN OWNED BY distributed_source_role1, "distributed_source_role-\!" TO local_target_role1 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +show citus.enable_create_role_propagation; + citus.enable_create_role_propagation +--------------------------------------------------------------------- + off +(1 row) + +show citus.enable_alter_role_propagation; + citus.enable_alter_role_propagation +--------------------------------------------------------------------- + off +(1 row) + +show citus.enable_alter_role_set_propagation; + citus.enable_alter_role_set_propagation +--------------------------------------------------------------------- + off +(1 row) + +reset citus.grep_remote_commands; +reset citus.log_remote_commands; +reset citus.enable_create_role_propagation; +reset citus.enable_alter_role_propagation; +reset citus.enable_alter_role_set_propagation; +--check if the owner changed to local_target_role1 +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT + schemaname, + tablename, + tableowner + FROM + pg_tables + WHERE + tablename in ('test_table3', 'test_table4') + ORDER BY tablename + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + [{"tablename": "test_table3", "schemaname": "public", "tableowner": "local_target_role1"}, {"tablename": "test_table4", "schemaname": "public", "tableowner": "local_target_role1"}] + [{"tablename": "test_table3", "schemaname": "public", "tableowner": "local_target_role1"}, {"tablename": "test_table4", "schemaname": "public", "tableowner": "local_target_role1"}] + [{"tablename": "test_table3", "schemaname": "public", "tableowner": "local_target_role1"}, {"tablename": "test_table4", "schemaname": "public", "tableowner": "local_target_role1"}] +(3 rows) + +--clear resources +DROP OWNED BY distributed_source_role1, "distributed_source_role-\!","distributed_target_role1-\!",local_target_role1; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT + schemaname, + tablename, + tableowner +FROM + pg_tables +WHERE + tablename in ('test_table', 'test_table2', 'test_table3', 'test_table4') + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + + + +(3 rows) + +set client_min_messages to warning; +drop role distributed_source_role1, "distributed_source_role-\!","distributed_target_role1-\!",local_target_role1,local_source_role1; diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 9528cc704..2b9fdeb2d 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -59,6 +59,7 @@ test: grant_on_database_propagation test: alter_database_propagation test: citus_shards +test: reassign_owned # ---------- # multi_citus_tools tests utility functions written for citus tools diff --git a/src/test/regress/sql/citus_schema_distribute_undistribute.sql b/src/test/regress/sql/citus_schema_distribute_undistribute.sql index 1008b90b2..a7e9bf051 100644 --- a/src/test/regress/sql/citus_schema_distribute_undistribute.sql +++ b/src/test/regress/sql/citus_schema_distribute_undistribute.sql @@ -185,7 +185,7 @@ SELECT citus_schema_undistribute('tenant1'); -- assign all tables to dummyregular except table5 SET role tenantuser; -SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO dummyregular; $$); +REASSIGN OWNED BY tenantuser TO dummyregular; CREATE TABLE tenant1.table5(id int); -- table owner check fails the distribution @@ -219,7 +219,7 @@ SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*)=0 FROM pg_dist_co SELECT result FROM run_command_on_all_nodes($$ SELECT array_agg(logicalrelid ORDER BY logicalrelid) FROM pg_dist_partition WHERE logicalrelid::text LIKE 'tenant1.%' AND colocationid > 0 $$); RESET role; -SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY dummyregular TO tenantuser; $$); +REASSIGN OWNED BY dummyregular TO tenantuser; DROP USER dummyregular; CREATE USER dummysuper superuser; diff --git a/src/test/regress/sql/citus_schema_move.sql b/src/test/regress/sql/citus_schema_move.sql index 8240feff7..bdf0d20ff 100644 --- a/src/test/regress/sql/citus_schema_move.sql +++ b/src/test/regress/sql/citus_schema_move.sql @@ -147,7 +147,7 @@ SELECT citus_schema_move('s2', 'dummy_node', 1234); -- assign all tables to regularuser RESET ROLE; -SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO regularuser; $$); +REASSIGN OWNED BY tenantuser TO regularuser; GRANT USAGE ON SCHEMA citus_schema_move TO regularuser; diff --git a/src/test/regress/sql/reassign_owned.sql b/src/test/regress/sql/reassign_owned.sql new file mode 100644 index 000000000..0262b643c --- /dev/null +++ b/src/test/regress/sql/reassign_owned.sql @@ -0,0 +1,141 @@ +CREATE ROLE distributed_source_role1; +create ROLE "distributed_source_role-\!"; + +CREATE ROLE "distributed_target_role1-\!"; + +set citus.enable_create_role_propagation to off; +create ROLE local_target_role1; + + +\c - - - :worker_1_port +set citus.enable_create_role_propagation to off; +CREATE ROLE local_target_role1; + +\c - - - :master_port +set citus.enable_create_role_propagation to off; +create role local_source_role1; +reset citus.enable_create_role_propagation; + +GRANT CREATE ON SCHEMA public TO distributed_source_role1,"distributed_source_role-\!"; + +SET ROLE distributed_source_role1; +CREATE TABLE public.test_table (col1 int); + +set role "distributed_source_role-\!"; +CREATE TABLE public.test_table2 (col2 int); +RESET ROLE; +select create_distributed_table('test_table', 'col1'); +select create_distributed_table('test_table2', 'col2'); + + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT + schemaname, + tablename, + tableowner + FROM + pg_tables + WHERE + tablename in ('test_table', 'test_table2') + ORDER BY tablename + ) q2 + $$ +) ORDER BY result; + +--tests for reassing owned by with multiple distributed roles and a local role to a distributed role +--local role should be ignored +set citus.log_remote_commands to on; +set citus.grep_remote_commands = '%REASSIGN OWNED BY%'; +REASSIGN OWNED BY distributed_source_role1,"distributed_source_role-\!",local_source_role1 TO "distributed_target_role1-\!"; +reset citus.grep_remote_commands; +reset citus.log_remote_commands; + +--check if the owner changed to "distributed_target_role1-\!" + +RESET citus.log_remote_commands; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT + schemaname, + tablename, + tableowner + FROM + pg_tables + WHERE + tablename in ('test_table', 'test_table2') + ORDER BY tablename + ) q2 + $$ +) ORDER BY result; + +--tests for reassing owned by with multiple distributed roles and a local role to a local role +--local role should be ignored +SET ROLE distributed_source_role1; +CREATE TABLE public.test_table3 (col1 int); + +set role "distributed_source_role-\!"; +CREATE TABLE public.test_table4 (col2 int); +RESET ROLE; +select create_distributed_table('test_table3', 'col1'); +select create_distributed_table('test_table4', 'col2'); + +set citus.log_remote_commands to on; +set citus.grep_remote_commands = '%REASSIGN OWNED BY%'; +set citus.enable_create_role_propagation to off; +set citus.enable_alter_role_propagation to off; +set citus.enable_alter_role_set_propagation to off; +REASSIGN OWNED BY distributed_source_role1,"distributed_source_role-\!",local_source_role1 TO local_target_role1; + +show citus.enable_create_role_propagation; +show citus.enable_alter_role_propagation; +show citus.enable_alter_role_set_propagation; + +reset citus.grep_remote_commands; +reset citus.log_remote_commands; +reset citus.enable_create_role_propagation; +reset citus.enable_alter_role_propagation; +reset citus.enable_alter_role_set_propagation; + + +--check if the owner changed to local_target_role1 +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT + schemaname, + tablename, + tableowner + FROM + pg_tables + WHERE + tablename in ('test_table3', 'test_table4') + ORDER BY tablename + ) q2 + $$ +) ORDER BY result; + +--clear resources +DROP OWNED BY distributed_source_role1, "distributed_source_role-\!","distributed_target_role1-\!",local_target_role1; + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT + schemaname, + tablename, + tableowner +FROM + pg_tables +WHERE + tablename in ('test_table', 'test_table2', 'test_table3', 'test_table4') + ) q2 + $$ +) ORDER BY result; + + +set client_min_messages to warning; +drop role distributed_source_role1, "distributed_source_role-\!","distributed_target_role1-\!",local_target_role1,local_source_role1;