Adds 2PC distributed commands from other databases (#7203)

DESCRIPTION: Adds support for 2PC from non-Citus main databases

This PR only adds support for `CREATE USER` queries, other queries need
to be added. But it should be simple because this PR creates the
underlying structure.

Citus main database is the database where the Citus extension is
created. A non-main database is all the other databases that are in the
same node with a Citus main database.

When a `CREATE USER` query is run on a non-main database we:

1. Run `start_management_transaction` on the main database. This
function saves the outer transaction's xid (the non-main database
query's transaction id) and marks the current query as main db command.
2. Run `execute_command_on_remote_nodes_as_user("CREATE USER
<username>", <username to run the command>)` on the main database. This
function creates the users in the rest of the cluster by running the
query on the other nodes. The user on the current node is created by the
query on the outer, non-main db, query to make sure consequent commands
in the same transaction can see this user.
3. Run `mark_object_distributed` on the main database. This function
adds the user to `pg_dist_object` in all of the nodes, including the
current one.

This PR also implements transaction recovery for the queries from
non-main databases.
pull/7253/head
Halil Ozan Akgül 2023-12-22 19:19:41 +03:00 committed by GitHub
parent 6801a1ed1e
commit b877d606c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 1146 additions and 23 deletions

View File

@ -885,6 +885,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
char *workerPgDistObjectUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
NIL,
distArgumentIndexList,
colocationIdList,
forceDelegationList);

View File

@ -34,6 +34,7 @@
#include "access/htup_details.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
@ -44,6 +45,7 @@
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "postmaster/postmaster.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@ -77,6 +79,7 @@
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/string_utils.h"
#include "distributed/transaction_management.h"
@ -84,6 +87,13 @@
#include "distributed/worker_shard_visibility.h"
#include "distributed/worker_transaction.h"
#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \
"SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)"
#define START_MANAGEMENT_TRANSACTION \
"SELECT citus_internal.start_management_transaction('%lu')"
#define MARK_OBJECT_DISTRIBUTED \
"SELECT citus_internal.mark_object_distributed(%d, %s, %d)"
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE;
@ -112,6 +122,8 @@ static void PostStandardProcessUtility(Node *parsetree);
static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
static bool IsDropSchemaOrDB(Node *parsetree);
static bool ShouldCheckUndistributeCitusLocalTables(void);
static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString);
static void RunPostprocessMainDBCommand(Node *parsetree);
/*
* ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of
@ -243,6 +255,11 @@ citus_ProcessUtility(PlannedStmt *pstmt,
if (!CitusHasBeenLoaded())
{
if (!IsMainDB)
{
RunPreprocessMainDBCommand(parsetree, queryString);
}
/*
* Ensure that utility commands do not behave any differently until CREATE
* EXTENSION is invoked.
@ -250,6 +267,11 @@ citus_ProcessUtility(PlannedStmt *pstmt,
PrevProcessUtility(pstmt, queryString, false, context,
params, queryEnv, dest, completionTag);
if (!IsMainDB)
{
RunPostprocessMainDBCommand(parsetree);
}
return;
}
else if (IsA(parsetree, CallStmt))
@ -1572,3 +1594,49 @@ DropSchemaOrDBInProgress(void)
{
return activeDropSchemaOrDBs > 0;
}
/*
* RunPreprocessMainDBCommand runs the necessary commands for a query, in main
* database before query is run on the local node with PrevProcessUtility
*/
static void
RunPreprocessMainDBCommand(Node *parsetree, const char *queryString)
{
if (IsA(parsetree, CreateRoleStmt))
{
StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
START_MANAGEMENT_TRANSACTION,
GetCurrentFullTransactionId().value);
RunCitusMainDBQuery(mainDBQuery->data);
mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER,
quote_literal_cstr(queryString),
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
}
}
/*
* RunPostprocessMainDBCommand runs the necessary commands for a query, in main
* database after query is run on the local node with PrevProcessUtility
*/
static void
RunPostprocessMainDBCommand(Node *parsetree)
{
if (IsA(parsetree, CreateRoleStmt))
{
StringInfo mainDBQuery = makeStringInfo();
CreateRoleStmt *createRoleStmt = castNode(CreateRoleStmt, parsetree);
Oid roleOid = get_role_oid(createRoleStmt->role, false);
appendStringInfo(mainDBQuery,
MARK_OBJECT_DISTRIBUTED,
AuthIdRelationId,
quote_literal_cstr(createRoleStmt->role),
roleOid);
RunCitusMainDBQuery(mainDBQuery->data);
}
}

View File

@ -425,11 +425,13 @@ GetConnParam(const char *keyword)
/*
* GetEffectiveConnKey checks whether there is any pooler configuration for the
* provided key (host/port combination). The one case where this logic is not
* applied is for loopback connections originating within the task tracker. If
* a corresponding row is found in the poolinfo table, a modified (effective)
* key is returned with the node, port, and dbname overridden, as applicable,
* otherwise, the original key is returned unmodified.
* provided key (host/port combination). If a corresponding row is found in the
* poolinfo table, a modified (effective) key is returned with the node, port,
* and dbname overridden, as applicable, otherwise, the original key is returned
* unmodified.
*
* In the case of Citus non-main databases we just return the key, since we
* would not have access to tables with worker information.
*/
ConnectionHashKey *
GetEffectiveConnKey(ConnectionHashKey *key)
@ -444,7 +446,17 @@ GetEffectiveConnKey(ConnectionHashKey *key)
return key;
}
if (!CitusHasBeenLoaded())
{
/*
* This happens when we connect to main database over localhost
* from some non Citus database.
*/
return key;
}
WorkerNode *worker = FindWorkerNode(key->hostname, key->port);
if (worker == NULL)
{
/* this can be hit when the key references an unknown node */

View File

@ -49,18 +49,42 @@
#include "distributed/metadata/pg_dist_object.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/remote_commands.h"
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress,
char *objectName);
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
Datum *paramValues);
static bool IsObjectDistributed(const ObjectAddress *address);
PG_FUNCTION_INFO_V1(mark_object_distributed);
PG_FUNCTION_INFO_V1(citus_unmark_object_distributed);
PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
/*
* mark_object_distributed adds an object to pg_dist_object
* in all of the nodes.
*/
Datum
mark_object_distributed(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
Oid classId = PG_GETARG_OID(0);
text *objectNameText = PG_GETARG_TEXT_P(1);
char *objectName = text_to_cstring(objectNameText);
Oid objectId = PG_GETARG_OID(2);
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*objectAddress, classId, objectId);
MarkObjectDistributedWithName(objectAddress, objectName);
PG_RETURN_VOID();
}
/*
* citus_unmark_object_distributed(classid oid, objid oid, objsubid int)
*
@ -160,12 +184,28 @@ ObjectExists(const ObjectAddress *address)
void
MarkObjectDistributed(const ObjectAddress *distAddress)
{
MarkObjectDistributedWithName(distAddress, "");
}
/*
* MarkObjectDistributedWithName marks an object as a distributed object.
* Same as MarkObjectDistributed but this function also allows passing an objectName
* that is used in case the object does not exists for the current transaction.
*/
void
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName)
{
if (!CitusHasBeenLoaded())
{
elog(ERROR, "Cannot mark object distributed because Citus has not been loaded.");
}
MarkObjectDistributedLocally(distAddress);
if (EnableMetadataSync)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
CreatePgDistObjectEntryCommand(distAddress, objectName);
SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand);
}
}
@ -188,7 +228,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
if (EnableMetadataSync)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
CreatePgDistObjectEntryCommand(distAddress, "");
SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
}
}
@ -279,17 +319,21 @@ ShouldMarkRelationDistributed(Oid relationId)
* for the given object address.
*/
static char *
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress)
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, char *objectName)
{
/* create a list by adding the address of value to not to have warning */
List *objectAddressList =
list_make1((ObjectAddress *) objectAddress);
/* names also require a list so we create a nested list here */
List *objectNameList = list_make1(list_make1((char *) objectName));
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);
char *workerPgDistObjectUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
objectNameList,
distArgumetIndexList,
colocationIdList,
forceDelegationList);

View File

@ -79,6 +79,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_placement.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/remote_commands.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shared_library_init.h"
#include "distributed/utils/array_type.h"
@ -5722,6 +5723,14 @@ GetPoolinfoViaCatalog(int32 nodeId)
char *
GetAuthinfoViaCatalog(const char *roleName, int64 nodeId)
{
/*
* Citus will not be loaded when we run a global DDL command from a
* Citus non-main database.
*/
if (!CitusHasBeenLoaded())
{
return "";
}
char *authinfo = "";
Datum nodeIdDatumArray[2] = {
Int32GetDatum(nodeId),

View File

@ -83,6 +83,7 @@
#include "distributed/pg_dist_shard.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/resource_lock.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/utils/array_type.h"
@ -900,6 +901,7 @@ NodeListIdempotentInsertCommand(List *workerNodeList)
*/
char *
MarkObjectsDistributedCreateCommand(List *addresses,
List *namesArg,
List *distributionArgumentIndexes,
List *colocationIds,
List *forceDelegations)
@ -924,9 +926,25 @@ MarkObjectsDistributedCreateCommand(List *addresses,
int forceDelegation = list_nth_int(forceDelegations, currentObjectCounter);
List *names = NIL;
List *args = NIL;
char *objectType = NULL;
char *objectType = getObjectTypeDescription(address, false);
getObjectIdentityParts(address, &names, &args, false);
if (IsMainDBCommand)
{
/*
* When we try to distribute an object that's being created in a non Citus
* main database, we cannot find the name, since the object is not visible
* in Citus main database.
* Because of that we need to pass the name to this function.
*/
names = list_nth(namesArg, currentObjectCounter);
bool missingOk = false;
objectType = getObjectTypeDescription(address, missingOk);
}
else
{
objectType = getObjectTypeDescription(address, false);
getObjectIdentityParts(address, &names, &args, IsMainDBCommand);
}
if (!isFirstObject)
{
@ -5148,6 +5166,7 @@ SendDistObjectCommands(MetadataSyncContext *context)
char *workerMetadataUpdateCommand =
MarkObjectsDistributedCreateCommand(list_make1(address),
NIL,
list_make1_int(distributionArgumentIndex),
list_make1_int(colocationId),
list_make1_int(forceDelegation));

View File

@ -94,6 +94,7 @@
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/repartition_executor.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/resource_lock.h"
@ -2570,6 +2571,17 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NoticeIfSubqueryPushdownEnabled, NULL, NULL);
DefineCustomStringVariable(
"citus.superuser",
gettext_noop("Name of a superuser role to be used in Citus main database "
"connections"),
NULL,
&SuperuserRole,
"",
PGC_SUSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.task_assignment_policy",
gettext_noop("Sets the policy to use when assigning tasks to worker nodes."),
@ -3149,6 +3161,8 @@ CitusAuthHook(Port *port, int status)
*/
InitializeBackendData(port->application_name);
IsMainDB = (strncmp(MainDb, "", NAMEDATALEN) == 0 ||
strncmp(MainDb, port->database_name, NAMEDATALEN) == 0);
/* let other authentication hooks to kick in first */
if (original_client_auth_hook)

View File

@ -3,3 +3,10 @@
#include "udfs/citus_internal_database_command/12.2-1.sql"
#include "udfs/citus_add_rebalance_strategy/12.2-1.sql"
#include "udfs/start_management_transaction/12.2-1.sql"
#include "udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql"
#include "udfs/mark_object_distributed/12.2-1.sql"
#include "udfs/commit_management_command_2pc/12.2-1.sql"
ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8;

View File

@ -3,3 +3,20 @@
DROP FUNCTION pg_catalog.citus_internal_database_command(text);
#include "../udfs/citus_add_rebalance_strategy/10.1-1.sql"
DROP FUNCTION citus_internal.start_management_transaction(
outer_xid xid8
);
DROP FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(
query text,
username text
);
DROP FUNCTION citus_internal.mark_object_distributed(
classId Oid, objectName text, objectId Oid
);
DROP FUNCTION citus_internal.commit_management_command_2pc();
ALTER TABLE pg_catalog.pg_dist_transaction DROP COLUMN outer_xid;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc()
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$;
COMMENT ON FUNCTION citus_internal.commit_management_command_2pc()
IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc()
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$;
COMMENT ON FUNCTION citus_internal.commit_management_command_2pc()
IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$;
COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
IS 'executes a query on the nodes other than the current one';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$;
COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
IS 'executes a query on the nodes other than the current one';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$mark_object_distributed$$;
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
IS 'adds an object to pg_dist_object on all nodes';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$mark_object_distributed$$;
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
IS 'adds an object to pg_dist_object on all nodes';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$start_management_transaction$$;
COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
IS 'internal Citus function that starts a management transaction in the main database';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$start_management_transaction$$;
COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
IS 'internal Citus function that starts a management transaction in the main database';

View File

@ -19,14 +19,19 @@
#include "miscadmin.h"
#include "access/xact.h"
#include "postmaster/postmaster.h"
#include "utils/builtins.h"
#include "utils/hsearch.h"
#include "utils/xid8.h"
#include "distributed/backend_data.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
@ -56,6 +61,9 @@ static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection
static void Assign2PCIdentifier(MultiConnection *connection);
PG_FUNCTION_INFO_V1(start_management_transaction);
PG_FUNCTION_INFO_V1(execute_command_on_remote_nodes_as_user);
PG_FUNCTION_INFO_V1(commit_management_command_2pc);
static char *IsolationLevelName[] = {
"READ UNCOMMITTED",
@ -64,6 +72,154 @@ static char *IsolationLevelName[] = {
"SERIALIZABLE"
};
/*
* These variables are necessary for running queries from a database that is not
* the Citus main database. Some of these queries need to be propagated to the
* workers and Citus main database will be used for these queries, such as
* CREATE ROLE. For that we create a connection to the Citus main database and
* run queries from there.
*/
/* The MultiConnection used for connecting Citus main database. */
MultiConnection *MainDBConnection = NULL;
/*
* IsMainDBCommand is true if this is a query in the Citus main database that is started
* by a query from a different database.
*/
bool IsMainDBCommand = false;
/*
* The transaction id of the query from the other database that started the
* main database query.
*/
FullTransactionId OuterXid;
/*
* Shows if this is the Citus main database or not. We needed a variable instead of
* checking if this database's name is the same as MainDb because we sometimes need
* this value outside a transaction where we cannot reach the current database name.
*/
bool IsMainDB = true;
/*
* Name of a superuser role to be used during main database connections.
*/
char *SuperuserRole = NULL;
/*
* start_management_transaction starts a management transaction
* in the main database by recording the outer transaction's transaction id and setting
* IsMainDBCommand to true.
*/
Datum
start_management_transaction(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
OuterXid = PG_GETARG_FULLTRANSACTIONID(0);
IsMainDBCommand = true;
Use2PCForCoordinatedTransaction();
PG_RETURN_VOID();
}
/*
* execute_command_on_remote_nodes_as_user executes the query on the nodes
* other than the current node, using the user passed.
*/
Datum
execute_command_on_remote_nodes_as_user(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
text *queryText = PG_GETARG_TEXT_P(0);
char *query = text_to_cstring(queryText);
text *usernameText = PG_GETARG_TEXT_P(1);
char *username = text_to_cstring(usernameText);
StringInfo queryToSend = makeStringInfo();
appendStringInfo(queryToSend, "%s;%s;%s", DISABLE_METADATA_SYNC, query,
ENABLE_METADATA_SYNC);
SendCommandToWorkersAsUser(REMOTE_NODES, username, queryToSend->data);
PG_RETURN_VOID();
}
/*
* commit_management_command_2pc is a wrapper UDF for
* CoordinatedRemoteTransactionsCommit
*/
Datum
commit_management_command_2pc(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
RecoverTwoPhaseCommits();
PG_RETURN_VOID();
}
/*
* RunCitusMainDBQuery creates a connection to Citus main database if necessary
* and runs the query over the connection in the main database.
*/
void
RunCitusMainDBQuery(char *query)
{
if (MainDBConnection == NULL)
{
if (strlen(SuperuserRole) == 0)
{
ereport(ERROR, (errmsg("No superuser role is given for Citus main "
"database connection"),
errhint("Set citus.superuser to a superuser role name")));
}
int flags = 0;
MainDBConnection = GetNodeUserDatabaseConnection(flags, LocalHostName,
PostPortNumber,
SuperuserRole,
MainDb);
RemoteTransactionBegin(MainDBConnection);
}
SendRemoteCommand(MainDBConnection, query);
PGresult *result = GetRemoteCommandResult(MainDBConnection, true);
if (!IsResponseOK(result))
{
ReportResultError(MainDBConnection, result, ERROR);
}
ForgetResults(MainDBConnection);
}
/*
* CleanCitusMainDBConnection closes and removes the connection to Citus main database.
*/
void
CleanCitusMainDBConnection(void)
{
if (MainDBConnection == NULL)
{
return;
}
CloseConnection(MainDBConnection);
MainDBConnection = NULL;
}
/*
* StartRemoteTransactionBegin initiates beginning the remote transaction in
@ -616,7 +772,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection)
WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port);
if (workerNode != NULL)
{
LogTransactionRecord(workerNode->groupId, transaction->preparedName);
LogTransactionRecord(workerNode->groupId, transaction->preparedName, OuterXid);
}
/*

View File

@ -21,6 +21,7 @@
#include "catalog/dependency.h"
#include "common/hashfn.h"
#include "nodes/print.h"
#include "postmaster/postmaster.h"
#include "storage/fd.h"
#include "utils/datum.h"
#include "utils/guc.h"
@ -46,6 +47,7 @@
#include "distributed/multi_logical_replication.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/repartition_join_execution.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/shard_cleaner.h"
@ -55,6 +57,9 @@
#include "distributed/version_compat.h"
#include "distributed/worker_log_messages.h"
#define COMMIT_MANAGEMENT_COMMAND_2PC \
"SELECT citus_internal.commit_management_command_2pc()"
CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
@ -317,12 +322,23 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
MemoryContext previousContext =
MemoryContextSwitchTo(CitusXactCallbackContext);
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED &&
!IsMainDBCommand)
{
/* handles both already prepared and open transactions */
CoordinatedRemoteTransactionsCommit();
}
/*
* If this is a non-Citus main database we should try to commit the prepared
* transactions created by the Citus main database on the worker nodes.
*/
if (!IsMainDB && MainDBConnection != NULL)
{
RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC);
CleanCitusMainDBConnection();
}
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
@ -378,6 +394,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
RemoveIntermediateResultsDirectories();
CleanCitusMainDBConnection();
/* handles both already prepared and open transactions */
if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
{
@ -509,6 +527,17 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
break;
}
/*
* If this is a non-Citus main database we should commit the Citus
* main database query. So if some error happens on the distributed main
* database query we wouldn't have committed the current query.
*/
if (!IsMainDB && MainDBConnection != NULL)
{
RunCitusMainDBQuery("COMMIT");
}
/*
* TODO: It'd probably be a good idea to force constraints and
* such to 'immediate' here. Deferred triggers might try to send
@ -537,7 +566,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* us to mark failed placements as invalid. Better don't use
* this for anything important (i.e. DDL/metadata).
*/
CoordinatedRemoteTransactionsCommit();
if (IsMainDB)
{
CoordinatedRemoteTransactionsCommit();
}
CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED;
}

View File

@ -29,10 +29,12 @@
#include "lib/stringinfo.h"
#include "storage/lmgr.h"
#include "storage/lock.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/xid8.h"
#include "pg_version_constants.h"
@ -82,7 +84,7 @@ recover_prepared_transactions(PG_FUNCTION_ARGS)
* prepared transaction should be committed.
*/
void
LogTransactionRecord(int32 groupId, char *transactionName)
LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId outerXid)
{
Datum values[Natts_pg_dist_transaction];
bool isNulls[Natts_pg_dist_transaction];
@ -93,6 +95,7 @@ LogTransactionRecord(int32 groupId, char *transactionName)
values[Anum_pg_dist_transaction_groupid - 1] = Int32GetDatum(groupId);
values[Anum_pg_dist_transaction_gid - 1] = CStringGetTextDatum(transactionName);
values[Anum_pg_dist_transaction_outerxid - 1] = FullTransactionIdGetDatum(outerXid);
/* open transaction relation and insert new tuple */
Relation pgDistTransaction = table_open(DistTransactionRelationId(),
@ -258,6 +261,54 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
continue;
}
/* Check if the transaction is created by an outer transaction from a non-main database */
bool outerXidIsNull = false;
Datum outerXidDatum = heap_getattr(heapTuple,
Anum_pg_dist_transaction_outerxid,
tupleDescriptor, &outerXidIsNull);
TransactionId outerXid = 0;
if (!outerXidIsNull)
{
FullTransactionId outerFullXid = DatumGetFullTransactionId(outerXidDatum);
outerXid = XidFromFullTransactionId(outerFullXid);
}
if (outerXid != 0)
{
bool outerXactIsInProgress = TransactionIdIsInProgress(outerXid);
bool outerXactDidCommit = TransactionIdDidCommit(outerXid);
if (outerXactIsInProgress && !outerXactDidCommit)
{
/*
* The transaction is initiated from an outer transaction and the outer
* transaction is not yet committed, so we should not commit either.
* We remove this transaction from the pendingTransactionSet so it'll
* not be aborted by the loop below.
*/
hash_search(pendingTransactionSet, transactionName, HASH_REMOVE,
&foundPreparedTransactionBeforeCommit);
continue;
}
else if (!outerXactIsInProgress && !outerXactDidCommit)
{
/*
* Since outer transaction isn't in progress and did not commit we need to
* abort the prepared transaction too. We do this by simply doing the same
* thing we would do for transactions that are initiated from the main
* database.
*/
continue;
}
else
{
/*
* Outer transaction did commit, so we can try to commit the prepared
* transaction too.
*/
}
}
/*
* Remove the transaction from the pending list such that only transactions
* that need to be aborted remain at the end.

View File

@ -234,7 +234,8 @@ List *
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{
List *workerNodeList = NIL;
if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES)
if (targetWorkerSet == ALL_SHARD_NODES ||
targetWorkerSet == METADATA_NODES)
{
workerNodeList = ActivePrimaryNodeList(lockMode);
}

View File

@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
extern bool IsAnyObjectDistributed(const List *addresses);
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name);
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address);

View File

@ -89,6 +89,7 @@ extern List * NodeMetadataCreateCommands(void);
extern List * CitusTableMetadataCreateCommandList(Oid relationId);
extern List * NodeMetadataDropCommands(void);
extern char * MarkObjectsDistributedCreateCommand(List *addresses,
List *names,
List *distributionArgumentIndexes,
List *colocationIds,
List *forceDelegations);

View File

@ -35,9 +35,10 @@ typedef FormData_pg_dist_transaction *Form_pg_dist_transaction;
* compiler constants for pg_dist_transaction
* ----------------
*/
#define Natts_pg_dist_transaction 2
#define Natts_pg_dist_transaction 3
#define Anum_pg_dist_transaction_groupid 1
#define Anum_pg_dist_transaction_gid 2
#define Anum_pg_dist_transaction_outerxid 3
#endif /* PG_DIST_TRANSACTION_H */

View File

@ -144,4 +144,13 @@ extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId);
extern void CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId);
extern void CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId);
extern void RunCitusMainDBQuery(char *query);
extern void CleanCitusMainDBConnection(void);
extern bool IsMainDBCommand;
extern bool IsMainDB;
extern char *SuperuserRole;
extern char *MainDb;
extern struct MultiConnection *MainDBConnection;
#endif /* REMOTE_TRANSACTION_H */

View File

@ -17,7 +17,8 @@ extern int Recover2PCInterval;
/* Functions declarations for worker transactions */
extern void LogTransactionRecord(int32 groupId, char *transactionName);
extern void LogTransactionRecord(int32 groupId, char *transactionName,
FullTransactionId outerXid);
extern int RecoverTwoPhaseCommits(void);
extern void DeleteWorkerTransactions(WorkerNode *workerNode);

View File

@ -0,0 +1,154 @@
def test_main_commited_outer_not_yet(cluster):
c = cluster.coordinator
w0 = cluster.workers[0]
# create a non-main database
c.sql("CREATE DATABASE db1")
# we will use cur1 to simulate non-main database user and
# cur2 to manually do the steps we would do in the main database
with c.cur(dbname="db1") as cur1, c.cur() as cur2:
# let's start a transaction and find its transaction id
cur1.execute("BEGIN")
cur1.execute("SELECT txid_current()")
txid = cur1.fetchall()
# using the transaction id of the cur1 simulate the main database commands manually
cur2.execute("BEGIN")
cur2.execute(
"SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),)
)
cur2.execute(
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')"
)
cur2.execute(
"SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)"
)
cur2.execute("COMMIT")
# run the transaction recovery
c.sql("SELECT recover_prepared_transactions()")
# user should not be created on the worker because outer transaction is not committed yet
role_before_commit = w0.sql_value(
"SELECT count(*) FROM pg_roles WHERE rolname = 'u1'"
)
assert (
int(role_before_commit) == 0
), "role is on pg_dist_object despite not committing"
# user should not be in pg_dist_object on the worker because outer transaction is not committed yet
pdo_before_commit = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
)
assert int(pdo_before_commit) == 0, "role is created despite not committing"
# commit in cur1 so the transaction recovery thinks this is a successful transaction
cur1.execute("COMMIT")
# run the transaction recovery again after committing
c.sql("SELECT recover_prepared_transactions()")
# check that the user is created by the transaction recovery on the worker
role_after_commit = w0.sql_value(
"SELECT count(*) FROM pg_roles WHERE rolname = 'u1'"
)
assert (
int(role_after_commit) == 1
), "role is not created during recovery despite committing"
# check that the user is on pg_dist_object on the worker after transaction recovery
pdo_after_commit = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
)
assert (
int(pdo_after_commit) == 1
), "role is not on pg_dist_object after recovery despite committing"
c.sql("DROP DATABASE db1")
c.sql(
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('DROP USER u1', 'postgres')"
)
c.sql(
"""
SELECT run_command_on_workers($$
DELETE FROM pg_dist_object
WHERE objid::regrole::text = 'u1'
$$)
"""
)
def test_main_commited_outer_aborted(cluster):
c = cluster.coordinator
w0 = cluster.workers[0]
# create a non-main database
c.sql("CREATE DATABASE db2")
# we will use cur1 to simulate non-main database user and
# cur2 to manually do the steps we would do in the main database
with c.cur(dbname="db2") as cur1, c.cur() as cur2:
# let's start a transaction and find its transaction id
cur1.execute("BEGIN")
cur1.execute("SELECT txid_current()")
txid = cur1.fetchall()
# using the transaction id of the cur1 simulate the main database commands manually
cur2.execute("BEGIN")
cur2.execute(
"SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),)
)
cur2.execute(
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u2;', 'postgres')"
)
cur2.execute(
"SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321)"
)
cur2.execute("COMMIT")
# abort cur1 so the transaction recovery thinks this is an aborted transaction
cur1.execute("ABORT")
# check that the user is not yet created on the worker
role_before_recovery = w0.sql_value(
"SELECT count(*) FROM pg_roles WHERE rolname = 'u2'"
)
assert int(role_before_recovery) == 0, "role is already created before recovery"
# check that the user is not on pg_dist_object on the worker
pdo_before_recovery = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
)
assert (
int(pdo_before_recovery) == 0
), "role is already on pg_dist_object before recovery"
# run the transaction recovery
c.sql("SELECT recover_prepared_transactions()")
# check that the user is not created by the transaction recovery on the worker
role_after_recovery = w0.sql_value(
"SELECT count(*) FROM pg_roles WHERE rolname = 'u2'"
)
assert (
int(role_after_recovery) == 0
), "role is created during recovery despite aborting"
# check that the user is not on pg_dist_object on the worker after transaction recovery
pdo_after_recovery = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
)
assert (
int(pdo_after_recovery) == 0
), "role is on pg_dist_object after recovery despite aborting"
c.sql("DROP DATABASE db2")

View File

@ -0,0 +1,154 @@
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CREATE SCHEMA failure_non_main_db_2pc;
SET SEARCH_PATH TO 'failure_non_main_db_2pc';
CREATE DATABASE other_db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c other_db1
CREATE USER user_1;
\c regression
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
nodeid | result
---------------------------------------------------------------------
0 | user_1
1 | user_1
2 |
(3 rows)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
1
(1 row)
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
nodeid | result
---------------------------------------------------------------------
0 | user_1
1 | user_1
2 | user_1
(3 rows)
SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c other_db1
CREATE USER user_2;
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
while executing command on localhost:xxxxx
\c regression
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
nodeid | result
---------------------------------------------------------------------
0 |
1 |
2 |
(3 rows)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
nodeid | result
---------------------------------------------------------------------
0 |
1 |
2 |
(3 rows)
DROP DATABASE other_db1;
-- user_2 should not exist because the query to create it will fail
-- but let's make sure we try to drop it just in case
DROP USER IF EXISTS user_1, user_2;
NOTICE: role "user_2" does not exist, skipping
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
CREATE DATABASE other_db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c other_db2
CREATE USER user_3;
\c regression
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
result
---------------------------------------------------------------------
user_3
user_3
(3 rows)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
1
(1 row)
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
result
---------------------------------------------------------------------
user_3
user_3
user_3
(3 rows)
DROP DATABASE other_db2;
DROP USER user_3;
\c - - - :master_port
SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$);
result
---------------------------------------------------------------------
DELETE 1
DELETE 1
DELETE 1
(3 rows)
DROP SCHEMA failure_non_main_db_2pc;

View File

@ -1420,10 +1420,14 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 12.2-1
ALTER EXTENSION citus UPDATE TO '12.2-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
| function citus_internal.commit_management_command_2pc() void
| function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void
| function citus_internal.mark_object_distributed(oid,text,oid) void
| function citus_internal.start_management_transaction(xid8) void
| function citus_internal_database_command(text) void
(1 row)
(5 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -64,7 +64,7 @@ SELECT recover_prepared_transactions();
(1 row)
-- delete the citus_122_should_do_nothing transaction
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *;
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid;
groupid | gid
---------------------------------------------------------------------
122 | citus_122_should_do_nothing

View File

@ -0,0 +1,130 @@
CREATE SCHEMA other_databases;
SET search_path TO other_databases;
SET citus.next_shard_id TO 10231023;
CREATE DATABASE other_db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c other_db1
SHOW citus.main_db;
citus.main_db
---------------------------------------------------------------------
regression
(1 row)
-- check that empty citus.superuser gives error
SET citus.superuser TO '';
CREATE USER empty_superuser;
ERROR: No superuser role is given for Citus main database connection
HINT: Set citus.superuser to a superuser role name
SET citus.superuser TO 'postgres';
CREATE USER other_db_user1;
CREATE USER other_db_user2;
BEGIN;
CREATE USER other_db_user3;
CREATE USER other_db_user4;
COMMIT;
BEGIN;
CREATE USER other_db_user5;
CREATE USER other_db_user6;
ROLLBACK;
BEGIN;
CREATE USER other_db_user7;
SELECT 1/0;
ERROR: division by zero
COMMIT;
CREATE USER other_db_user8;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
other_db_user1
other_db_user2
other_db_user3
other_db_user4
other_db_user8
(5 rows)
\c - - - :worker_1_port
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
other_db_user1
other_db_user2
other_db_user3
other_db_user4
other_db_user8
(5 rows)
\c - - - :master_port
-- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8;
NOTICE: role "other_db_user5" does not exist, skipping
NOTICE: role "other_db_user6" does not exist, skipping
NOTICE: role "other_db_user7" does not exist, skipping
-- Make sure non-superuser roles cannot use internal GUCs
-- but they can still create a role
CREATE USER nonsuperuser CREATEROLE;
GRANT ALL ON SCHEMA citus_internal TO nonsuperuser;
SET ROLE nonsuperuser;
SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres');
ERROR: operation is not allowed
HINT: Run the command with a superuser.
\c other_db1
CREATE USER other_db_user9;
RESET ROLE;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
other_db_user9
(1 row)
\c - - - :worker_1_port
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
other_db_user9
(1 row)
\c - - - :master_port
REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser;
DROP USER other_db_user9, nonsuperuser;
-- test from a worker
\c - - - :worker_1_port
CREATE DATABASE other_db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c other_db2
CREATE USER worker_user1;
BEGIN;
CREATE USER worker_user2;
COMMIT;
BEGIN;
CREATE USER worker_user3;
ROLLBACK;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
worker_user1
worker_user2
(2 rows)
\c - - - :master_port
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
worker_user1
worker_user2
(2 rows)
-- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS worker_user1, worker_user2, worker_user3;
NOTICE: role "worker_user3" does not exist, skipping
\c - - - :worker_1_port
DROP DATABASE other_db2;
\c - - - :master_port
DROP SCHEMA other_databases;
DROP DATABASE other_db1;

View File

@ -56,13 +56,17 @@ ORDER BY 1;
function citus_get_active_worker_nodes()
function citus_get_node_clock()
function citus_get_transaction_clock()
function citus_internal.commit_management_command_2pc()
function citus_internal.execute_command_on_remote_nodes_as_user(text,text)
function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.mark_object_distributed(oid,text,oid)
function citus_internal.pg_dist_node_trigger_func()
function citus_internal.pg_dist_rebalance_strategy_trigger_func()
function citus_internal.pg_dist_shard_placement_trigger_func()
function citus_internal.refresh_isolation_tester_prepared_statement()
function citus_internal.replace_isolation_tester_func()
function citus_internal.restore_isolation_tester_func()
function citus_internal.start_management_transaction(xid8)
function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid)
function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean)
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
@ -344,5 +348,5 @@ ORDER BY 1;
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(334 rows)
(338 rows)

View File

@ -34,6 +34,7 @@ test: failure_multi_row_insert
test: failure_mx_metadata_sync
test: failure_mx_metadata_sync_multi_trans
test: failure_connection_establishment
test: failure_non_main_db_2pc
# this test syncs metadata to the workers
test: failure_failover_to_local_execution

View File

@ -108,6 +108,7 @@ test: object_propagation_debug
test: undistribute_table
test: run_command_on_all_nodes
test: background_task_queue_monitor
test: other_databases
# Causal clock test
test: clock

View File

@ -490,6 +490,8 @@ push(@pgOptions, "citus.stat_statements_track = 'all'");
push(@pgOptions, "citus.enable_change_data_capture=on");
push(@pgOptions, "citus.stat_tenants_limit = 2");
push(@pgOptions, "citus.stat_tenants_track = 'ALL'");
push(@pgOptions, "citus.main_db = 'regression'");
push(@pgOptions, "citus.superuser = 'postgres'");
# Some tests look at shards in pg_class, make sure we can usually see them:
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");

View File

@ -0,0 +1,75 @@
SELECT citus.mitmproxy('conn.allow()');
CREATE SCHEMA failure_non_main_db_2pc;
SET SEARCH_PATH TO 'failure_non_main_db_2pc';
CREATE DATABASE other_db1;
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
\c other_db1
CREATE USER user_1;
\c regression
SELECT citus.mitmproxy('conn.allow()');
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
SELECT recover_prepared_transactions();
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()');
\c other_db1
CREATE USER user_2;
\c regression
SELECT citus.mitmproxy('conn.allow()');
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
SELECT recover_prepared_transactions();
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
DROP DATABASE other_db1;
-- user_2 should not exist because the query to create it will fail
-- but let's make sure we try to drop it just in case
DROP USER IF EXISTS user_1, user_2;
SELECT citus_set_coordinator_host('localhost');
\c - - - :worker_1_port
CREATE DATABASE other_db2;
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
\c other_db2
CREATE USER user_3;
\c regression
SELECT citus.mitmproxy('conn.allow()');
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
SELECT recover_prepared_transactions();
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
DROP DATABASE other_db2;
DROP USER user_3;
\c - - - :master_port
SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$);
DROP SCHEMA failure_non_main_db_2pc;

View File

@ -47,7 +47,7 @@ INSERT INTO pg_dist_transaction VALUES (122, 'citus_122_should_do_nothing');
SELECT recover_prepared_transactions();
-- delete the citus_122_should_do_nothing transaction
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *;
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid;
ROLLBACK PREPARED 'citus_122_should_do_nothing';
SELECT count(*) FROM pg_dist_transaction;

View File

@ -0,0 +1,98 @@
CREATE SCHEMA other_databases;
SET search_path TO other_databases;
SET citus.next_shard_id TO 10231023;
CREATE DATABASE other_db1;
\c other_db1
SHOW citus.main_db;
-- check that empty citus.superuser gives error
SET citus.superuser TO '';
CREATE USER empty_superuser;
SET citus.superuser TO 'postgres';
CREATE USER other_db_user1;
CREATE USER other_db_user2;
BEGIN;
CREATE USER other_db_user3;
CREATE USER other_db_user4;
COMMIT;
BEGIN;
CREATE USER other_db_user5;
CREATE USER other_db_user6;
ROLLBACK;
BEGIN;
CREATE USER other_db_user7;
SELECT 1/0;
COMMIT;
CREATE USER other_db_user8;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
\c - - - :worker_1_port
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
\c - - - :master_port
-- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8;
-- Make sure non-superuser roles cannot use internal GUCs
-- but they can still create a role
CREATE USER nonsuperuser CREATEROLE;
GRANT ALL ON SCHEMA citus_internal TO nonsuperuser;
SET ROLE nonsuperuser;
SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres');
\c other_db1
CREATE USER other_db_user9;
RESET ROLE;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
\c - - - :worker_1_port
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
\c - - - :master_port
REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser;
DROP USER other_db_user9, nonsuperuser;
-- test from a worker
\c - - - :worker_1_port
CREATE DATABASE other_db2;
\c other_db2
CREATE USER worker_user1;
BEGIN;
CREATE USER worker_user2;
COMMIT;
BEGIN;
CREATE USER worker_user3;
ROLLBACK;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
\c - - - :master_port
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
-- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS worker_user1, worker_user2, worker_user3;
\c - - - :worker_1_port
DROP DATABASE other_db2;
\c - - - :master_port
DROP SCHEMA other_databases;
DROP DATABASE other_db1;