diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index b76a6d5bf..6d2dd0ba9 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -885,6 +885,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, char *workerPgDistObjectUpdateCommand = MarkObjectsDistributedCreateCommand(objectAddressList, + NIL, distArgumentIndexList, colocationIdList, forceDelegationList); diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index b62dda9ad..9e6b66e3e 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -34,6 +34,7 @@ #include "access/htup_details.h" #include "catalog/catalog.h" #include "catalog/dependency.h" +#include "catalog/pg_authid.h" #include "catalog/pg_database.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -44,6 +45,7 @@ #include "nodes/makefuncs.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" +#include "postmaster/postmaster.h" #include "tcop/utility.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -77,6 +79,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/reference_table_utils.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/string_utils.h" #include "distributed/transaction_management.h" @@ -84,6 +87,13 @@ #include "distributed/worker_shard_visibility.h" #include "distributed/worker_transaction.h" +#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \ + "SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)" +#define START_MANAGEMENT_TRANSACTION \ + "SELECT citus_internal.start_management_transaction('%lu')" +#define MARK_OBJECT_DISTRIBUTED \ + "SELECT citus_internal.mark_object_distributed(%d, %s, %d)" + bool EnableDDLPropagation = true; /* ddl propagation is enabled */ int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE; @@ -112,6 +122,8 @@ static void PostStandardProcessUtility(Node *parsetree); static void DecrementUtilityHookCountersIfNecessary(Node *parsetree); static bool IsDropSchemaOrDB(Node *parsetree); static bool ShouldCheckUndistributeCitusLocalTables(void); +static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString); +static void RunPostprocessMainDBCommand(Node *parsetree); /* * ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of @@ -243,6 +255,11 @@ citus_ProcessUtility(PlannedStmt *pstmt, if (!CitusHasBeenLoaded()) { + if (!IsMainDB) + { + RunPreprocessMainDBCommand(parsetree, queryString); + } + /* * Ensure that utility commands do not behave any differently until CREATE * EXTENSION is invoked. @@ -250,6 +267,11 @@ citus_ProcessUtility(PlannedStmt *pstmt, PrevProcessUtility(pstmt, queryString, false, context, params, queryEnv, dest, completionTag); + if (!IsMainDB) + { + RunPostprocessMainDBCommand(parsetree); + } + return; } else if (IsA(parsetree, CallStmt)) @@ -1572,3 +1594,49 @@ DropSchemaOrDBInProgress(void) { return activeDropSchemaOrDBs > 0; } + + +/* + * RunPreprocessMainDBCommand runs the necessary commands for a query, in main + * database before query is run on the local node with PrevProcessUtility + */ +static void +RunPreprocessMainDBCommand(Node *parsetree, const char *queryString) +{ + if (IsA(parsetree, CreateRoleStmt)) + { + StringInfo mainDBQuery = makeStringInfo(); + appendStringInfo(mainDBQuery, + START_MANAGEMENT_TRANSACTION, + GetCurrentFullTransactionId().value); + RunCitusMainDBQuery(mainDBQuery->data); + mainDBQuery = makeStringInfo(); + appendStringInfo(mainDBQuery, + EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER, + quote_literal_cstr(queryString), + quote_literal_cstr(CurrentUserName())); + RunCitusMainDBQuery(mainDBQuery->data); + } +} + + +/* + * RunPostprocessMainDBCommand runs the necessary commands for a query, in main + * database after query is run on the local node with PrevProcessUtility + */ +static void +RunPostprocessMainDBCommand(Node *parsetree) +{ + if (IsA(parsetree, CreateRoleStmt)) + { + StringInfo mainDBQuery = makeStringInfo(); + CreateRoleStmt *createRoleStmt = castNode(CreateRoleStmt, parsetree); + Oid roleOid = get_role_oid(createRoleStmt->role, false); + appendStringInfo(mainDBQuery, + MARK_OBJECT_DISTRIBUTED, + AuthIdRelationId, + quote_literal_cstr(createRoleStmt->role), + roleOid); + RunCitusMainDBQuery(mainDBQuery->data); + } +} diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index c6a34a9d7..57069f698 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -425,11 +425,13 @@ GetConnParam(const char *keyword) /* * GetEffectiveConnKey checks whether there is any pooler configuration for the - * provided key (host/port combination). The one case where this logic is not - * applied is for loopback connections originating within the task tracker. If - * a corresponding row is found in the poolinfo table, a modified (effective) - * key is returned with the node, port, and dbname overridden, as applicable, - * otherwise, the original key is returned unmodified. + * provided key (host/port combination). If a corresponding row is found in the + * poolinfo table, a modified (effective) key is returned with the node, port, + * and dbname overridden, as applicable, otherwise, the original key is returned + * unmodified. + * + * In the case of Citus non-main databases we just return the key, since we + * would not have access to tables with worker information. */ ConnectionHashKey * GetEffectiveConnKey(ConnectionHashKey *key) @@ -444,7 +446,17 @@ GetEffectiveConnKey(ConnectionHashKey *key) return key; } + if (!CitusHasBeenLoaded()) + { + /* + * This happens when we connect to main database over localhost + * from some non Citus database. + */ + return key; + } + WorkerNode *worker = FindWorkerNode(key->hostname, key->port); + if (worker == NULL) { /* this can be hit when the key references an unknown node */ diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index 25d976a55..edccd86b9 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -49,18 +49,42 @@ #include "distributed/metadata/pg_dist_object.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/remote_commands.h" #include "distributed/version_compat.h" #include "distributed/worker_transaction.h" -static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); +static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, + char *objectName); static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, Datum *paramValues); static bool IsObjectDistributed(const ObjectAddress *address); +PG_FUNCTION_INFO_V1(mark_object_distributed); PG_FUNCTION_INFO_V1(citus_unmark_object_distributed); PG_FUNCTION_INFO_V1(master_unmark_object_distributed); +/* + * mark_object_distributed adds an object to pg_dist_object + * in all of the nodes. + */ +Datum +mark_object_distributed(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + Oid classId = PG_GETARG_OID(0); + text *objectNameText = PG_GETARG_TEXT_P(1); + char *objectName = text_to_cstring(objectNameText); + Oid objectId = PG_GETARG_OID(2); + ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*objectAddress, classId, objectId); + MarkObjectDistributedWithName(objectAddress, objectName); + PG_RETURN_VOID(); +} + + /* * citus_unmark_object_distributed(classid oid, objid oid, objsubid int) * @@ -160,12 +184,28 @@ ObjectExists(const ObjectAddress *address) void MarkObjectDistributed(const ObjectAddress *distAddress) { + MarkObjectDistributedWithName(distAddress, ""); +} + + +/* + * MarkObjectDistributedWithName marks an object as a distributed object. + * Same as MarkObjectDistributed but this function also allows passing an objectName + * that is used in case the object does not exists for the current transaction. + */ +void +MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName) +{ + if (!CitusHasBeenLoaded()) + { + elog(ERROR, "Cannot mark object distributed because Citus has not been loaded."); + } MarkObjectDistributedLocally(distAddress); if (EnableMetadataSync) { char *workerPgDistObjectUpdateCommand = - CreatePgDistObjectEntryCommand(distAddress); + CreatePgDistObjectEntryCommand(distAddress, objectName); SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand); } } @@ -188,7 +228,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress) if (EnableMetadataSync) { char *workerPgDistObjectUpdateCommand = - CreatePgDistObjectEntryCommand(distAddress); + CreatePgDistObjectEntryCommand(distAddress, ""); SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand); } } @@ -279,17 +319,21 @@ ShouldMarkRelationDistributed(Oid relationId) * for the given object address. */ static char * -CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress) +CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, char *objectName) { /* create a list by adding the address of value to not to have warning */ List *objectAddressList = list_make1((ObjectAddress *) objectAddress); + + /* names also require a list so we create a nested list here */ + List *objectNameList = list_make1(list_make1((char *) objectName)); List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX); List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID); List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN); char *workerPgDistObjectUpdateCommand = MarkObjectsDistributedCreateCommand(objectAddressList, + objectNameList, distArgumetIndexList, colocationIdList, forceDelegationList); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 1b2fa229f..b8983ba21 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -79,6 +79,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_placement.h" #include "distributed/pg_dist_shard.h" +#include "distributed/remote_commands.h" #include "distributed/shardinterval_utils.h" #include "distributed/shared_library_init.h" #include "distributed/utils/array_type.h" @@ -5722,6 +5723,14 @@ GetPoolinfoViaCatalog(int32 nodeId) char * GetAuthinfoViaCatalog(const char *roleName, int64 nodeId) { + /* + * Citus will not be loaded when we run a global DDL command from a + * Citus non-main database. + */ + if (!CitusHasBeenLoaded()) + { + return ""; + } char *authinfo = ""; Datum nodeIdDatumArray[2] = { Int32GetDatum(nodeId), diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f0be1995b..842a45519 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -83,6 +83,7 @@ #include "distributed/pg_dist_shard.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" #include "distributed/resource_lock.h" #include "distributed/tenant_schema_metadata.h" #include "distributed/utils/array_type.h" @@ -900,6 +901,7 @@ NodeListIdempotentInsertCommand(List *workerNodeList) */ char * MarkObjectsDistributedCreateCommand(List *addresses, + List *namesArg, List *distributionArgumentIndexes, List *colocationIds, List *forceDelegations) @@ -924,9 +926,25 @@ MarkObjectsDistributedCreateCommand(List *addresses, int forceDelegation = list_nth_int(forceDelegations, currentObjectCounter); List *names = NIL; List *args = NIL; + char *objectType = NULL; - char *objectType = getObjectTypeDescription(address, false); - getObjectIdentityParts(address, &names, &args, false); + if (IsMainDBCommand) + { + /* + * When we try to distribute an object that's being created in a non Citus + * main database, we cannot find the name, since the object is not visible + * in Citus main database. + * Because of that we need to pass the name to this function. + */ + names = list_nth(namesArg, currentObjectCounter); + bool missingOk = false; + objectType = getObjectTypeDescription(address, missingOk); + } + else + { + objectType = getObjectTypeDescription(address, false); + getObjectIdentityParts(address, &names, &args, IsMainDBCommand); + } if (!isFirstObject) { @@ -5148,6 +5166,7 @@ SendDistObjectCommands(MetadataSyncContext *context) char *workerMetadataUpdateCommand = MarkObjectsDistributedCreateCommand(list_make1(address), + NIL, list_make1_int(distributionArgumentIndex), list_make1_int(colocationId), list_make1_int(forceDelegation)); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index ffb235596..ad5a14a25 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -94,6 +94,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" #include "distributed/repartition_executor.h" #include "distributed/replication_origin_session_utils.h" #include "distributed/resource_lock.h" @@ -2570,6 +2571,17 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NoticeIfSubqueryPushdownEnabled, NULL, NULL); + DefineCustomStringVariable( + "citus.superuser", + gettext_noop("Name of a superuser role to be used in Citus main database " + "connections"), + NULL, + &SuperuserRole, + "", + PGC_SUSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomEnumVariable( "citus.task_assignment_policy", gettext_noop("Sets the policy to use when assigning tasks to worker nodes."), @@ -3149,6 +3161,8 @@ CitusAuthHook(Port *port, int status) */ InitializeBackendData(port->application_name); + IsMainDB = (strncmp(MainDb, "", NAMEDATALEN) == 0 || + strncmp(MainDb, port->database_name, NAMEDATALEN) == 0); /* let other authentication hooks to kick in first */ if (original_client_auth_hook) diff --git a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql index 63c4a527f..2ce2d7a21 100644 --- a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql +++ b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql @@ -3,3 +3,10 @@ #include "udfs/citus_internal_database_command/12.2-1.sql" #include "udfs/citus_add_rebalance_strategy/12.2-1.sql" + +#include "udfs/start_management_transaction/12.2-1.sql" +#include "udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql" +#include "udfs/mark_object_distributed/12.2-1.sql" +#include "udfs/commit_management_command_2pc/12.2-1.sql" + +ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8; diff --git a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql index d18f7257b..0a6f68b06 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql @@ -3,3 +3,20 @@ DROP FUNCTION pg_catalog.citus_internal_database_command(text); #include "../udfs/citus_add_rebalance_strategy/10.1-1.sql" + +DROP FUNCTION citus_internal.start_management_transaction( + outer_xid xid8 +); + +DROP FUNCTION citus_internal.execute_command_on_remote_nodes_as_user( + query text, + username text +); + +DROP FUNCTION citus_internal.mark_object_distributed( + classId Oid, objectName text, objectId Oid +); + +DROP FUNCTION citus_internal.commit_management_command_2pc(); + +ALTER TABLE pg_catalog.pg_dist_transaction DROP COLUMN outer_xid; diff --git a/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.2-1.sql b/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.2-1.sql new file mode 100644 index 000000000..8c24e6dd4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc() + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$; + +COMMENT ON FUNCTION citus_internal.commit_management_command_2pc() + IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit'; diff --git a/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql b/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql new file mode 100644 index 000000000..8c24e6dd4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc() + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$; + +COMMENT ON FUNCTION citus_internal.commit_management_command_2pc() + IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit'; diff --git a/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql new file mode 100644 index 000000000..fc1076e9c --- /dev/null +++ b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$; + +COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + IS 'executes a query on the nodes other than the current one'; diff --git a/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/latest.sql b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/latest.sql new file mode 100644 index 000000000..fc1076e9c --- /dev/null +++ b/src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$; + +COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text) + IS 'executes a query on the nodes other than the current one'; diff --git a/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql b/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql new file mode 100644 index 000000000..ee2c5e7e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/mark_object_distributed/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$mark_object_distributed$$; + +COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql b/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql new file mode 100644 index 000000000..ee2c5e7e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/mark_object_distributed/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$mark_object_distributed$$; + +COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) + IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/sql/udfs/start_management_transaction/12.2-1.sql b/src/backend/distributed/sql/udfs/start_management_transaction/12.2-1.sql new file mode 100644 index 000000000..ec1f416d0 --- /dev/null +++ b/src/backend/distributed/sql/udfs/start_management_transaction/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$start_management_transaction$$; + +COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + IS 'internal Citus function that starts a management transaction in the main database'; diff --git a/src/backend/distributed/sql/udfs/start_management_transaction/latest.sql b/src/backend/distributed/sql/udfs/start_management_transaction/latest.sql new file mode 100644 index 000000000..ec1f416d0 --- /dev/null +++ b/src/backend/distributed/sql/udfs/start_management_transaction/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$start_management_transaction$$; + +COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8) + IS 'internal Citus function that starts a management transaction in the main database'; diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 3dc89c995..71b6a78dd 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -19,14 +19,19 @@ #include "miscadmin.h" #include "access/xact.h" +#include "postmaster/postmaster.h" #include "utils/builtins.h" #include "utils/hsearch.h" +#include "utils/xid8.h" #include "distributed/backend_data.h" #include "distributed/citus_safe_lib.h" +#include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" +#include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" @@ -56,6 +61,9 @@ static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection static void Assign2PCIdentifier(MultiConnection *connection); +PG_FUNCTION_INFO_V1(start_management_transaction); +PG_FUNCTION_INFO_V1(execute_command_on_remote_nodes_as_user); +PG_FUNCTION_INFO_V1(commit_management_command_2pc); static char *IsolationLevelName[] = { "READ UNCOMMITTED", @@ -64,6 +72,154 @@ static char *IsolationLevelName[] = { "SERIALIZABLE" }; +/* + * These variables are necessary for running queries from a database that is not + * the Citus main database. Some of these queries need to be propagated to the + * workers and Citus main database will be used for these queries, such as + * CREATE ROLE. For that we create a connection to the Citus main database and + * run queries from there. + */ + +/* The MultiConnection used for connecting Citus main database. */ +MultiConnection *MainDBConnection = NULL; + +/* + * IsMainDBCommand is true if this is a query in the Citus main database that is started + * by a query from a different database. + */ +bool IsMainDBCommand = false; + +/* + * The transaction id of the query from the other database that started the + * main database query. + */ +FullTransactionId OuterXid; + +/* + * Shows if this is the Citus main database or not. We needed a variable instead of + * checking if this database's name is the same as MainDb because we sometimes need + * this value outside a transaction where we cannot reach the current database name. + */ +bool IsMainDB = true; + +/* + * Name of a superuser role to be used during main database connections. + */ +char *SuperuserRole = NULL; + + +/* + * start_management_transaction starts a management transaction + * in the main database by recording the outer transaction's transaction id and setting + * IsMainDBCommand to true. + */ +Datum +start_management_transaction(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + OuterXid = PG_GETARG_FULLTRANSACTIONID(0); + IsMainDBCommand = true; + + Use2PCForCoordinatedTransaction(); + + PG_RETURN_VOID(); +} + + +/* + * execute_command_on_remote_nodes_as_user executes the query on the nodes + * other than the current node, using the user passed. + */ +Datum +execute_command_on_remote_nodes_as_user(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + text *queryText = PG_GETARG_TEXT_P(0); + char *query = text_to_cstring(queryText); + + text *usernameText = PG_GETARG_TEXT_P(1); + char *username = text_to_cstring(usernameText); + + StringInfo queryToSend = makeStringInfo(); + + appendStringInfo(queryToSend, "%s;%s;%s", DISABLE_METADATA_SYNC, query, + ENABLE_METADATA_SYNC); + + SendCommandToWorkersAsUser(REMOTE_NODES, username, queryToSend->data); + PG_RETURN_VOID(); +} + + +/* + * commit_management_command_2pc is a wrapper UDF for + * CoordinatedRemoteTransactionsCommit + */ +Datum +commit_management_command_2pc(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + RecoverTwoPhaseCommits(); + + PG_RETURN_VOID(); +} + + +/* + * RunCitusMainDBQuery creates a connection to Citus main database if necessary + * and runs the query over the connection in the main database. + */ +void +RunCitusMainDBQuery(char *query) +{ + if (MainDBConnection == NULL) + { + if (strlen(SuperuserRole) == 0) + { + ereport(ERROR, (errmsg("No superuser role is given for Citus main " + "database connection"), + errhint("Set citus.superuser to a superuser role name"))); + } + int flags = 0; + MainDBConnection = GetNodeUserDatabaseConnection(flags, LocalHostName, + PostPortNumber, + SuperuserRole, + MainDb); + RemoteTransactionBegin(MainDBConnection); + } + + SendRemoteCommand(MainDBConnection, query); + + PGresult *result = GetRemoteCommandResult(MainDBConnection, true); + + if (!IsResponseOK(result)) + { + ReportResultError(MainDBConnection, result, ERROR); + } + + ForgetResults(MainDBConnection); +} + + +/* + * CleanCitusMainDBConnection closes and removes the connection to Citus main database. + */ +void +CleanCitusMainDBConnection(void) +{ + if (MainDBConnection == NULL) + { + return; + } + CloseConnection(MainDBConnection); + MainDBConnection = NULL; +} + /* * StartRemoteTransactionBegin initiates beginning the remote transaction in @@ -616,7 +772,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection) WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port); if (workerNode != NULL) { - LogTransactionRecord(workerNode->groupId, transaction->preparedName); + LogTransactionRecord(workerNode->groupId, transaction->preparedName, OuterXid); } /* diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index d133d7be6..8b3333639 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,6 +21,7 @@ #include "catalog/dependency.h" #include "common/hashfn.h" #include "nodes/print.h" +#include "postmaster/postmaster.h" #include "storage/fd.h" #include "utils/datum.h" #include "utils/guc.h" @@ -46,6 +47,7 @@ #include "distributed/multi_logical_replication.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" +#include "distributed/remote_commands.h" #include "distributed/repartition_join_execution.h" #include "distributed/replication_origin_session_utils.h" #include "distributed/shard_cleaner.h" @@ -55,6 +57,9 @@ #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" +#define COMMIT_MANAGEMENT_COMMAND_2PC \ + "SELECT citus_internal.commit_management_command_2pc()" + CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; @@ -317,12 +322,23 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) MemoryContext previousContext = MemoryContextSwitchTo(CitusXactCallbackContext); - if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) + if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED && + !IsMainDBCommand) { /* handles both already prepared and open transactions */ CoordinatedRemoteTransactionsCommit(); } + /* + * If this is a non-Citus main database we should try to commit the prepared + * transactions created by the Citus main database on the worker nodes. + */ + if (!IsMainDB && MainDBConnection != NULL) + { + RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC); + CleanCitusMainDBConnection(); + } + /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { @@ -378,6 +394,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) RemoveIntermediateResultsDirectories(); + CleanCitusMainDBConnection(); + /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) { @@ -509,6 +527,17 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) break; } + + /* + * If this is a non-Citus main database we should commit the Citus + * main database query. So if some error happens on the distributed main + * database query we wouldn't have committed the current query. + */ + if (!IsMainDB && MainDBConnection != NULL) + { + RunCitusMainDBQuery("COMMIT"); + } + /* * TODO: It'd probably be a good idea to force constraints and * such to 'immediate' here. Deferred triggers might try to send @@ -537,7 +566,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * us to mark failed placements as invalid. Better don't use * this for anything important (i.e. DDL/metadata). */ - CoordinatedRemoteTransactionsCommit(); + if (IsMainDB) + { + CoordinatedRemoteTransactionsCommit(); + } CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; } diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 0ec5ba0a3..653b962db 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -29,10 +29,12 @@ #include "lib/stringinfo.h" #include "storage/lmgr.h" #include "storage/lock.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/xid8.h" #include "pg_version_constants.h" @@ -82,7 +84,7 @@ recover_prepared_transactions(PG_FUNCTION_ARGS) * prepared transaction should be committed. */ void -LogTransactionRecord(int32 groupId, char *transactionName) +LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId outerXid) { Datum values[Natts_pg_dist_transaction]; bool isNulls[Natts_pg_dist_transaction]; @@ -93,6 +95,7 @@ LogTransactionRecord(int32 groupId, char *transactionName) values[Anum_pg_dist_transaction_groupid - 1] = Int32GetDatum(groupId); values[Anum_pg_dist_transaction_gid - 1] = CStringGetTextDatum(transactionName); + values[Anum_pg_dist_transaction_outerxid - 1] = FullTransactionIdGetDatum(outerXid); /* open transaction relation and insert new tuple */ Relation pgDistTransaction = table_open(DistTransactionRelationId(), @@ -258,6 +261,54 @@ RecoverWorkerTransactions(WorkerNode *workerNode) continue; } + /* Check if the transaction is created by an outer transaction from a non-main database */ + bool outerXidIsNull = false; + Datum outerXidDatum = heap_getattr(heapTuple, + Anum_pg_dist_transaction_outerxid, + tupleDescriptor, &outerXidIsNull); + + TransactionId outerXid = 0; + if (!outerXidIsNull) + { + FullTransactionId outerFullXid = DatumGetFullTransactionId(outerXidDatum); + outerXid = XidFromFullTransactionId(outerFullXid); + } + + if (outerXid != 0) + { + bool outerXactIsInProgress = TransactionIdIsInProgress(outerXid); + bool outerXactDidCommit = TransactionIdDidCommit(outerXid); + if (outerXactIsInProgress && !outerXactDidCommit) + { + /* + * The transaction is initiated from an outer transaction and the outer + * transaction is not yet committed, so we should not commit either. + * We remove this transaction from the pendingTransactionSet so it'll + * not be aborted by the loop below. + */ + hash_search(pendingTransactionSet, transactionName, HASH_REMOVE, + &foundPreparedTransactionBeforeCommit); + continue; + } + else if (!outerXactIsInProgress && !outerXactDidCommit) + { + /* + * Since outer transaction isn't in progress and did not commit we need to + * abort the prepared transaction too. We do this by simply doing the same + * thing we would do for transactions that are initiated from the main + * database. + */ + continue; + } + else + { + /* + * Outer transaction did commit, so we can try to commit the prepared + * transaction too. + */ + } + } + /* * Remove the transaction from the pending list such that only transactions * that need to be aborted remain at the end. diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 2af1e9a6c..9c8563de0 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -234,7 +234,8 @@ List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { List *workerNodeList = NIL; - if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES) + if (targetWorkerSet == ALL_SHARD_NODES || + targetWorkerSet == METADATA_NODES) { workerNodeList = ActivePrimaryNodeList(lockMode); } diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index cf24a8c81..bbbbdf9da 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress); extern bool IsAnyObjectDistributed(const List *addresses); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); +extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index e20c44535..9f4c0a24b 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -89,6 +89,7 @@ extern List * NodeMetadataCreateCommands(void); extern List * CitusTableMetadataCreateCommandList(Oid relationId); extern List * NodeMetadataDropCommands(void); extern char * MarkObjectsDistributedCreateCommand(List *addresses, + List *names, List *distributionArgumentIndexes, List *colocationIds, List *forceDelegations); diff --git a/src/include/distributed/pg_dist_transaction.h b/src/include/distributed/pg_dist_transaction.h index 815633b70..95658f782 100644 --- a/src/include/distributed/pg_dist_transaction.h +++ b/src/include/distributed/pg_dist_transaction.h @@ -35,9 +35,10 @@ typedef FormData_pg_dist_transaction *Form_pg_dist_transaction; * compiler constants for pg_dist_transaction * ---------------- */ -#define Natts_pg_dist_transaction 2 +#define Natts_pg_dist_transaction 3 #define Anum_pg_dist_transaction_groupid 1 #define Anum_pg_dist_transaction_gid 2 +#define Anum_pg_dist_transaction_outerxid 3 #endif /* PG_DIST_TRANSACTION_H */ diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index 1c422da20..2b61c25bd 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -144,4 +144,13 @@ extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId); extern void CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId); extern void CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId); +extern void RunCitusMainDBQuery(char *query); +extern void CleanCitusMainDBConnection(void); + +extern bool IsMainDBCommand; +extern bool IsMainDB; +extern char *SuperuserRole; +extern char *MainDb; +extern struct MultiConnection *MainDBConnection; + #endif /* REMOTE_TRANSACTION_H */ diff --git a/src/include/distributed/transaction_recovery.h b/src/include/distributed/transaction_recovery.h index 811dbb949..a4073875a 100644 --- a/src/include/distributed/transaction_recovery.h +++ b/src/include/distributed/transaction_recovery.h @@ -17,7 +17,8 @@ extern int Recover2PCInterval; /* Functions declarations for worker transactions */ -extern void LogTransactionRecord(int32 groupId, char *transactionName); +extern void LogTransactionRecord(int32 groupId, char *transactionName, + FullTransactionId outerXid); extern int RecoverTwoPhaseCommits(void); extern void DeleteWorkerTransactions(WorkerNode *workerNode); diff --git a/src/test/regress/citus_tests/test/test_other_databases.py b/src/test/regress/citus_tests/test/test_other_databases.py new file mode 100644 index 000000000..cf824f926 --- /dev/null +++ b/src/test/regress/citus_tests/test/test_other_databases.py @@ -0,0 +1,154 @@ +def test_main_commited_outer_not_yet(cluster): + c = cluster.coordinator + w0 = cluster.workers[0] + + # create a non-main database + c.sql("CREATE DATABASE db1") + + # we will use cur1 to simulate non-main database user and + # cur2 to manually do the steps we would do in the main database + with c.cur(dbname="db1") as cur1, c.cur() as cur2: + # let's start a transaction and find its transaction id + cur1.execute("BEGIN") + cur1.execute("SELECT txid_current()") + txid = cur1.fetchall() + + # using the transaction id of the cur1 simulate the main database commands manually + cur2.execute("BEGIN") + cur2.execute( + "SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),) + ) + cur2.execute( + "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')" + ) + cur2.execute( + "SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)" + ) + cur2.execute("COMMIT") + + # run the transaction recovery + c.sql("SELECT recover_prepared_transactions()") + + # user should not be created on the worker because outer transaction is not committed yet + role_before_commit = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u1'" + ) + + assert ( + int(role_before_commit) == 0 + ), "role is on pg_dist_object despite not committing" + + # user should not be in pg_dist_object on the worker because outer transaction is not committed yet + pdo_before_commit = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" + ) + + assert int(pdo_before_commit) == 0, "role is created despite not committing" + + # commit in cur1 so the transaction recovery thinks this is a successful transaction + cur1.execute("COMMIT") + + # run the transaction recovery again after committing + c.sql("SELECT recover_prepared_transactions()") + + # check that the user is created by the transaction recovery on the worker + role_after_commit = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u1'" + ) + + assert ( + int(role_after_commit) == 1 + ), "role is not created during recovery despite committing" + + # check that the user is on pg_dist_object on the worker after transaction recovery + pdo_after_commit = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" + ) + + assert ( + int(pdo_after_commit) == 1 + ), "role is not on pg_dist_object after recovery despite committing" + + c.sql("DROP DATABASE db1") + c.sql( + "SELECT citus_internal.execute_command_on_remote_nodes_as_user('DROP USER u1', 'postgres')" + ) + c.sql( + """ + SELECT run_command_on_workers($$ + DELETE FROM pg_dist_object + WHERE objid::regrole::text = 'u1' + $$) + """ + ) + + +def test_main_commited_outer_aborted(cluster): + c = cluster.coordinator + w0 = cluster.workers[0] + + # create a non-main database + c.sql("CREATE DATABASE db2") + + # we will use cur1 to simulate non-main database user and + # cur2 to manually do the steps we would do in the main database + with c.cur(dbname="db2") as cur1, c.cur() as cur2: + # let's start a transaction and find its transaction id + cur1.execute("BEGIN") + cur1.execute("SELECT txid_current()") + txid = cur1.fetchall() + + # using the transaction id of the cur1 simulate the main database commands manually + cur2.execute("BEGIN") + cur2.execute( + "SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),) + ) + cur2.execute( + "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u2;', 'postgres')" + ) + cur2.execute( + "SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321)" + ) + cur2.execute("COMMIT") + + # abort cur1 so the transaction recovery thinks this is an aborted transaction + cur1.execute("ABORT") + + # check that the user is not yet created on the worker + role_before_recovery = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u2'" + ) + + assert int(role_before_recovery) == 0, "role is already created before recovery" + + # check that the user is not on pg_dist_object on the worker + pdo_before_recovery = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'" + ) + + assert ( + int(pdo_before_recovery) == 0 + ), "role is already on pg_dist_object before recovery" + + # run the transaction recovery + c.sql("SELECT recover_prepared_transactions()") + + # check that the user is not created by the transaction recovery on the worker + role_after_recovery = w0.sql_value( + "SELECT count(*) FROM pg_roles WHERE rolname = 'u2'" + ) + + assert ( + int(role_after_recovery) == 0 + ), "role is created during recovery despite aborting" + + # check that the user is not on pg_dist_object on the worker after transaction recovery + pdo_after_recovery = w0.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'" + ) + + assert ( + int(pdo_after_recovery) == 0 + ), "role is on pg_dist_object after recovery despite aborting" + + c.sql("DROP DATABASE db2") diff --git a/src/test/regress/expected/failure_non_main_db_2pc.out b/src/test/regress/expected/failure_non_main_db_2pc.out new file mode 100644 index 000000000..1e8558136 --- /dev/null +++ b/src/test/regress/expected/failure_non_main_db_2pc.out @@ -0,0 +1,154 @@ +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CREATE SCHEMA failure_non_main_db_2pc; +SET SEARCH_PATH TO 'failure_non_main_db_2pc'; +CREATE DATABASE other_db1; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +\c other_db1 +CREATE USER user_1; +\c regression +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | user_1 + 1 | user_1 + 2 | +(3 rows) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | user_1 + 1 | user_1 + 2 | user_1 +(3 rows) + +SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +\c other_db1 +CREATE USER user_2; +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx +while executing command on localhost:xxxxx +\c regression +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | + 1 | + 2 | +(3 rows) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + nodeid | result +--------------------------------------------------------------------- + 0 | + 1 | + 2 | +(3 rows) + +DROP DATABASE other_db1; +-- user_2 should not exist because the query to create it will fail +-- but let's make sure we try to drop it just in case +DROP USER IF EXISTS user_1, user_2; +NOTICE: role "user_2" does not exist, skipping +SELECT citus_set_coordinator_host('localhost'); + citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +CREATE DATABASE other_db2; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +\c other_db2 +CREATE USER user_3; +\c regression +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + result +--------------------------------------------------------------------- + + user_3 + user_3 +(3 rows) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + result +--------------------------------------------------------------------- + user_3 + user_3 + user_3 +(3 rows) + +DROP DATABASE other_db2; +DROP USER user_3; +\c - - - :master_port +SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$); + result +--------------------------------------------------------------------- + DELETE 1 + DELETE 1 + DELETE 1 +(3 rows) + +DROP SCHEMA failure_non_main_db_2pc; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 43f9c3b98..b9f489a1f 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1420,10 +1420,14 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 12.2-1 ALTER EXTENSION citus UPDATE TO '12.2-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- + | function citus_internal.commit_management_command_2pc() void + | function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void + | function citus_internal.mark_object_distributed(oid,text,oid) void + | function citus_internal.start_management_transaction(xid8) void | function citus_internal_database_command(text) void -(1 row) +(5 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index 20cec7578..0a29a22af 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -64,7 +64,7 @@ SELECT recover_prepared_transactions(); (1 row) -- delete the citus_122_should_do_nothing transaction -DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *; +DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid; groupid | gid --------------------------------------------------------------------- 122 | citus_122_should_do_nothing diff --git a/src/test/regress/expected/other_databases.out b/src/test/regress/expected/other_databases.out new file mode 100644 index 000000000..67d7dad3f --- /dev/null +++ b/src/test/regress/expected/other_databases.out @@ -0,0 +1,130 @@ +CREATE SCHEMA other_databases; +SET search_path TO other_databases; +SET citus.next_shard_id TO 10231023; +CREATE DATABASE other_db1; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c other_db1 +SHOW citus.main_db; + citus.main_db +--------------------------------------------------------------------- + regression +(1 row) + +-- check that empty citus.superuser gives error +SET citus.superuser TO ''; +CREATE USER empty_superuser; +ERROR: No superuser role is given for Citus main database connection +HINT: Set citus.superuser to a superuser role name +SET citus.superuser TO 'postgres'; +CREATE USER other_db_user1; +CREATE USER other_db_user2; +BEGIN; +CREATE USER other_db_user3; +CREATE USER other_db_user4; +COMMIT; +BEGIN; +CREATE USER other_db_user5; +CREATE USER other_db_user6; +ROLLBACK; +BEGIN; +CREATE USER other_db_user7; +SELECT 1/0; +ERROR: division by zero +COMMIT; +CREATE USER other_db_user8; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user1 + other_db_user2 + other_db_user3 + other_db_user4 + other_db_user8 +(5 rows) + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user1 + other_db_user2 + other_db_user3 + other_db_user4 + other_db_user8 +(5 rows) + +\c - - - :master_port +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8; +NOTICE: role "other_db_user5" does not exist, skipping +NOTICE: role "other_db_user6" does not exist, skipping +NOTICE: role "other_db_user7" does not exist, skipping +-- Make sure non-superuser roles cannot use internal GUCs +-- but they can still create a role +CREATE USER nonsuperuser CREATEROLE; +GRANT ALL ON SCHEMA citus_internal TO nonsuperuser; +SET ROLE nonsuperuser; +SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres'); +ERROR: operation is not allowed +HINT: Run the command with a superuser. +\c other_db1 +CREATE USER other_db_user9; +RESET ROLE; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user9 +(1 row) + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user9 +(1 row) + +\c - - - :master_port +REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser; +DROP USER other_db_user9, nonsuperuser; +-- test from a worker +\c - - - :worker_1_port +CREATE DATABASE other_db2; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c other_db2 +CREATE USER worker_user1; +BEGIN; +CREATE USER worker_user2; +COMMIT; +BEGIN; +CREATE USER worker_user3; +ROLLBACK; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + worker_user1 + worker_user2 +(2 rows) + +\c - - - :master_port +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + worker_user1 + worker_user2 +(2 rows) + +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS worker_user1, worker_user2, worker_user3; +NOTICE: role "worker_user3" does not exist, skipping +\c - - - :worker_1_port +DROP DATABASE other_db2; +\c - - - :master_port +DROP SCHEMA other_databases; +DROP DATABASE other_db1; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 942e0336f..6d41ac058 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -56,13 +56,17 @@ ORDER BY 1; function citus_get_active_worker_nodes() function citus_get_node_clock() function citus_get_transaction_clock() + function citus_internal.commit_management_command_2pc() + function citus_internal.execute_command_on_remote_nodes_as_user(text,text) function citus_internal.find_groupid_for_node(text,integer) + function citus_internal.mark_object_distributed(oid,text,oid) function citus_internal.pg_dist_node_trigger_func() function citus_internal.pg_dist_rebalance_strategy_trigger_func() function citus_internal.pg_dist_shard_placement_trigger_func() function citus_internal.refresh_isolation_tester_prepared_statement() function citus_internal.replace_isolation_tester_func() function citus_internal.restore_isolation_tester_func() + function citus_internal.start_management_transaction(xid8) function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid) function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char") @@ -344,5 +348,5 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(334 rows) +(338 rows) diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index afc4780bf..e1ad362b5 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -34,6 +34,7 @@ test: failure_multi_row_insert test: failure_mx_metadata_sync test: failure_mx_metadata_sync_multi_trans test: failure_connection_establishment +test: failure_non_main_db_2pc # this test syncs metadata to the workers test: failure_failover_to_local_execution diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 866b07f5f..5c9d8a45c 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -108,6 +108,7 @@ test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes test: background_task_queue_monitor +test: other_databases # Causal clock test test: clock diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 95330c638..c9a85d523 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -490,6 +490,8 @@ push(@pgOptions, "citus.stat_statements_track = 'all'"); push(@pgOptions, "citus.enable_change_data_capture=on"); push(@pgOptions, "citus.stat_tenants_limit = 2"); push(@pgOptions, "citus.stat_tenants_track = 'ALL'"); +push(@pgOptions, "citus.main_db = 'regression'"); +push(@pgOptions, "citus.superuser = 'postgres'"); # Some tests look at shards in pg_class, make sure we can usually see them: push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); diff --git a/src/test/regress/sql/failure_non_main_db_2pc.sql b/src/test/regress/sql/failure_non_main_db_2pc.sql new file mode 100644 index 000000000..74061ae34 --- /dev/null +++ b/src/test/regress/sql/failure_non_main_db_2pc.sql @@ -0,0 +1,75 @@ +SELECT citus.mitmproxy('conn.allow()'); + +CREATE SCHEMA failure_non_main_db_2pc; +SET SEARCH_PATH TO 'failure_non_main_db_2pc'; + +CREATE DATABASE other_db1; + +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + +\c other_db1 + +CREATE USER user_1; + +\c regression + +SELECT citus.mitmproxy('conn.allow()'); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + +SELECT recover_prepared_transactions(); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1; + + +SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()'); + +\c other_db1 + +CREATE USER user_2; + +\c regression + +SELECT citus.mitmproxy('conn.allow()'); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + +SELECT recover_prepared_transactions(); + +SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1; + +DROP DATABASE other_db1; +-- user_2 should not exist because the query to create it will fail +-- but let's make sure we try to drop it just in case +DROP USER IF EXISTS user_1, user_2; + +SELECT citus_set_coordinator_host('localhost'); + +\c - - - :worker_1_port + +CREATE DATABASE other_db2; + +SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()'); + +\c other_db2 + +CREATE USER user_3; + +\c regression + +SELECT citus.mitmproxy('conn.allow()'); + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + +SELECT recover_prepared_transactions(); + +SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1; + +DROP DATABASE other_db2; +DROP USER user_3; + +\c - - - :master_port + +SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$); + +DROP SCHEMA failure_non_main_db_2pc; diff --git a/src/test/regress/sql/multi_mx_transaction_recovery.sql b/src/test/regress/sql/multi_mx_transaction_recovery.sql index 2a6b4991b..e46917f35 100644 --- a/src/test/regress/sql/multi_mx_transaction_recovery.sql +++ b/src/test/regress/sql/multi_mx_transaction_recovery.sql @@ -47,7 +47,7 @@ INSERT INTO pg_dist_transaction VALUES (122, 'citus_122_should_do_nothing'); SELECT recover_prepared_transactions(); -- delete the citus_122_should_do_nothing transaction -DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *; +DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid; ROLLBACK PREPARED 'citus_122_should_do_nothing'; SELECT count(*) FROM pg_dist_transaction; diff --git a/src/test/regress/sql/other_databases.sql b/src/test/regress/sql/other_databases.sql new file mode 100644 index 000000000..629f74f45 --- /dev/null +++ b/src/test/regress/sql/other_databases.sql @@ -0,0 +1,98 @@ +CREATE SCHEMA other_databases; +SET search_path TO other_databases; + +SET citus.next_shard_id TO 10231023; + +CREATE DATABASE other_db1; + +\c other_db1 +SHOW citus.main_db; + +-- check that empty citus.superuser gives error +SET citus.superuser TO ''; +CREATE USER empty_superuser; +SET citus.superuser TO 'postgres'; + +CREATE USER other_db_user1; +CREATE USER other_db_user2; + +BEGIN; +CREATE USER other_db_user3; +CREATE USER other_db_user4; +COMMIT; + +BEGIN; +CREATE USER other_db_user5; +CREATE USER other_db_user6; +ROLLBACK; + +BEGIN; +CREATE USER other_db_user7; +SELECT 1/0; +COMMIT; + +CREATE USER other_db_user8; + +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :master_port +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8; + +-- Make sure non-superuser roles cannot use internal GUCs +-- but they can still create a role +CREATE USER nonsuperuser CREATEROLE; +GRANT ALL ON SCHEMA citus_internal TO nonsuperuser; +SET ROLE nonsuperuser; +SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres'); + +\c other_db1 +CREATE USER other_db_user9; + +RESET ROLE; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :master_port +REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser; +DROP USER other_db_user9, nonsuperuser; + +-- test from a worker +\c - - - :worker_1_port + +CREATE DATABASE other_db2; + +\c other_db2 + +CREATE USER worker_user1; + +BEGIN; +CREATE USER worker_user2; +COMMIT; + +BEGIN; +CREATE USER worker_user3; +ROLLBACK; + +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + +\c - - - :master_port +SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1; + +-- some user creation commands will fail but let's make sure we try to drop them just in case +DROP USER IF EXISTS worker_user1, worker_user2, worker_user3; + +\c - - - :worker_1_port +DROP DATABASE other_db2; +\c - - - :master_port + +DROP SCHEMA other_databases; +DROP DATABASE other_db1;