mirror of https://github.com/citusdata/citus.git
Merge branch 'main' into reassign_owned_prop
commit
29e77edca2
|
@ -192,6 +192,25 @@ PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsSetTablespaceStatement returns true if the statement is a SET TABLESPACE statement,
|
||||
* false otherwise.
|
||||
*/
|
||||
static bool
|
||||
IsSetTablespaceStatement(AlterDatabaseStmt *stmt)
|
||||
{
|
||||
DefElem *def = NULL;
|
||||
foreach_ptr(def, stmt->options)
|
||||
{
|
||||
if (strcmp(def->defname, "tablespace") == 0)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PreprocessAlterDatabaseStmt is executed before the statement is applied to the local
|
||||
* postgres instance.
|
||||
|
@ -203,22 +222,38 @@ List *
|
|||
PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
if (!ShouldPropagate())
|
||||
bool missingOk = false;
|
||||
AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node);
|
||||
ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname,
|
||||
missingOk);
|
||||
|
||||
if (!ShouldPropagate() || !IsAnyObjectDistributed(list_make1(dbAddress)))
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node);
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
char *sql = DeparseTreeNode((Node *) stmt);
|
||||
|
||||
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||
(void *) sql,
|
||||
sql,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
if (IsSetTablespaceStatement(stmt))
|
||||
{
|
||||
/*
|
||||
* Set tablespace does not work inside a transaction.Therefore, we need to use
|
||||
* NontransactionalNodeDDLTask to run the command on the workers outside
|
||||
* the transaction block.
|
||||
*/
|
||||
|
||||
return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
}
|
||||
else
|
||||
{
|
||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -256,6 +291,36 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
|
|||
|
||||
#endif
|
||||
|
||||
/*
|
||||
* PreprocessAlterDatabaseRenameStmt is executed before the statement is applied to the local
|
||||
* postgres instance. In this stage we prepare ALTER DATABASE RENAME statement to be run on
|
||||
* all workers.
|
||||
*/
|
||||
List *
|
||||
PostprocessAlterDatabaseRenameStmt(Node *node, const char *queryString)
|
||||
{
|
||||
bool missingOk = false;
|
||||
RenameStmt *stmt = castNode(RenameStmt, node);
|
||||
ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->newname,
|
||||
missingOk);
|
||||
|
||||
if (!ShouldPropagate() || !IsAnyObjectDistributed(list_make1(dbAddress)))
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
char *sql = DeparseTreeNode((Node *) stmt);
|
||||
|
||||
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||
(void *) sql,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local
|
||||
* postgres instance.
|
||||
|
|
|
@ -533,6 +533,16 @@ static DistributeObjectOps Database_Set = {
|
|||
.markDistributed = false,
|
||||
};
|
||||
|
||||
static DistributeObjectOps Database_Rename = {
|
||||
.deparse = DeparseAlterDatabaseRenameStmt,
|
||||
.qualify = NULL,
|
||||
.preprocess = NULL,
|
||||
.postprocess = PostprocessAlterDatabaseRenameStmt,
|
||||
.objectType = OBJECT_DATABASE,
|
||||
.operationType = DIST_OPS_ALTER,
|
||||
.address = NULL,
|
||||
.markDistributed = false,
|
||||
};
|
||||
|
||||
static DistributeObjectOps Domain_Alter = {
|
||||
.deparse = DeparseAlterDomainStmt,
|
||||
|
@ -2103,6 +2113,11 @@ GetDistributeObjectOps(Node *node)
|
|||
return &Collation_Rename;
|
||||
}
|
||||
|
||||
case OBJECT_DATABASE:
|
||||
{
|
||||
return &Database_Rename;
|
||||
}
|
||||
|
||||
case OBJECT_DOMAIN:
|
||||
{
|
||||
return &Domain_Rename;
|
||||
|
|
|
@ -885,6 +885,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
|
|||
|
||||
char *workerPgDistObjectUpdateCommand =
|
||||
MarkObjectsDistributedCreateCommand(objectAddressList,
|
||||
NIL,
|
||||
distArgumentIndexList,
|
||||
colocationIdList,
|
||||
forceDelegationList);
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
#include "access/htup_details.h"
|
||||
#include "catalog/catalog.h"
|
||||
#include "catalog/dependency.h"
|
||||
#include "catalog/pg_authid.h"
|
||||
#include "catalog/pg_database.h"
|
||||
#include "commands/dbcommands.h"
|
||||
#include "commands/defrem.h"
|
||||
|
@ -44,6 +45,7 @@
|
|||
#include "nodes/makefuncs.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
|
@ -77,6 +79,7 @@
|
|||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/string_utils.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
|
@ -84,6 +87,13 @@
|
|||
#include "distributed/worker_shard_visibility.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
|
||||
#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \
|
||||
"SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)"
|
||||
#define START_MANAGEMENT_TRANSACTION \
|
||||
"SELECT citus_internal.start_management_transaction('%lu')"
|
||||
#define MARK_OBJECT_DISTRIBUTED \
|
||||
"SELECT citus_internal.mark_object_distributed(%d, %s, %d)"
|
||||
|
||||
|
||||
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
|
||||
int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE;
|
||||
|
@ -112,6 +122,8 @@ static void PostStandardProcessUtility(Node *parsetree);
|
|||
static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
|
||||
static bool IsDropSchemaOrDB(Node *parsetree);
|
||||
static bool ShouldCheckUndistributeCitusLocalTables(void);
|
||||
static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString);
|
||||
static void RunPostprocessMainDBCommand(Node *parsetree);
|
||||
|
||||
/*
|
||||
* ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of
|
||||
|
@ -243,6 +255,11 @@ citus_ProcessUtility(PlannedStmt *pstmt,
|
|||
|
||||
if (!CitusHasBeenLoaded())
|
||||
{
|
||||
if (!IsMainDB)
|
||||
{
|
||||
RunPreprocessMainDBCommand(parsetree, queryString);
|
||||
}
|
||||
|
||||
/*
|
||||
* Ensure that utility commands do not behave any differently until CREATE
|
||||
* EXTENSION is invoked.
|
||||
|
@ -250,6 +267,11 @@ citus_ProcessUtility(PlannedStmt *pstmt,
|
|||
PrevProcessUtility(pstmt, queryString, false, context,
|
||||
params, queryEnv, dest, completionTag);
|
||||
|
||||
if (!IsMainDB)
|
||||
{
|
||||
RunPostprocessMainDBCommand(parsetree);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
else if (IsA(parsetree, CallStmt))
|
||||
|
@ -1572,3 +1594,49 @@ DropSchemaOrDBInProgress(void)
|
|||
{
|
||||
return activeDropSchemaOrDBs > 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RunPreprocessMainDBCommand runs the necessary commands for a query, in main
|
||||
* database before query is run on the local node with PrevProcessUtility
|
||||
*/
|
||||
static void
|
||||
RunPreprocessMainDBCommand(Node *parsetree, const char *queryString)
|
||||
{
|
||||
if (IsA(parsetree, CreateRoleStmt))
|
||||
{
|
||||
StringInfo mainDBQuery = makeStringInfo();
|
||||
appendStringInfo(mainDBQuery,
|
||||
START_MANAGEMENT_TRANSACTION,
|
||||
GetCurrentFullTransactionId().value);
|
||||
RunCitusMainDBQuery(mainDBQuery->data);
|
||||
mainDBQuery = makeStringInfo();
|
||||
appendStringInfo(mainDBQuery,
|
||||
EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER,
|
||||
quote_literal_cstr(queryString),
|
||||
quote_literal_cstr(CurrentUserName()));
|
||||
RunCitusMainDBQuery(mainDBQuery->data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RunPostprocessMainDBCommand runs the necessary commands for a query, in main
|
||||
* database after query is run on the local node with PrevProcessUtility
|
||||
*/
|
||||
static void
|
||||
RunPostprocessMainDBCommand(Node *parsetree)
|
||||
{
|
||||
if (IsA(parsetree, CreateRoleStmt))
|
||||
{
|
||||
StringInfo mainDBQuery = makeStringInfo();
|
||||
CreateRoleStmt *createRoleStmt = castNode(CreateRoleStmt, parsetree);
|
||||
Oid roleOid = get_role_oid(createRoleStmt->role, false);
|
||||
appendStringInfo(mainDBQuery,
|
||||
MARK_OBJECT_DISTRIBUTED,
|
||||
AuthIdRelationId,
|
||||
quote_literal_cstr(createRoleStmt->role),
|
||||
roleOid);
|
||||
RunCitusMainDBQuery(mainDBQuery->data);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -425,11 +425,13 @@ GetConnParam(const char *keyword)
|
|||
|
||||
/*
|
||||
* GetEffectiveConnKey checks whether there is any pooler configuration for the
|
||||
* provided key (host/port combination). The one case where this logic is not
|
||||
* applied is for loopback connections originating within the task tracker. If
|
||||
* a corresponding row is found in the poolinfo table, a modified (effective)
|
||||
* key is returned with the node, port, and dbname overridden, as applicable,
|
||||
* otherwise, the original key is returned unmodified.
|
||||
* provided key (host/port combination). If a corresponding row is found in the
|
||||
* poolinfo table, a modified (effective) key is returned with the node, port,
|
||||
* and dbname overridden, as applicable, otherwise, the original key is returned
|
||||
* unmodified.
|
||||
*
|
||||
* In the case of Citus non-main databases we just return the key, since we
|
||||
* would not have access to tables with worker information.
|
||||
*/
|
||||
ConnectionHashKey *
|
||||
GetEffectiveConnKey(ConnectionHashKey *key)
|
||||
|
@ -444,7 +446,17 @@ GetEffectiveConnKey(ConnectionHashKey *key)
|
|||
return key;
|
||||
}
|
||||
|
||||
if (!CitusHasBeenLoaded())
|
||||
{
|
||||
/*
|
||||
* This happens when we connect to main database over localhost
|
||||
* from some non Citus database.
|
||||
*/
|
||||
return key;
|
||||
}
|
||||
|
||||
WorkerNode *worker = FindWorkerNode(key->hostname, key->port);
|
||||
|
||||
if (worker == NULL)
|
||||
{
|
||||
/* this can be hit when the key references an unknown node */
|
||||
|
|
|
@ -30,12 +30,14 @@
|
|||
static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
|
||||
static void AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt);
|
||||
static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt);
|
||||
static void AppendDefElemConnLimit(StringInfo buf, DefElem *def);
|
||||
static void AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt);
|
||||
static void AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt);
|
||||
static void AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt);
|
||||
static void AppendBasicAlterDatabaseOptions(StringInfo buf, AlterDatabaseStmt *stmt);
|
||||
static void AppendGrantDatabases(StringInfo buf, GrantStmt *stmt);
|
||||
static void AppendAlterDatabaseSetTablespace(StringInfo buf, DefElem *def, char *dbname);
|
||||
|
||||
const DefElemOptionFormat create_database_option_formats[] = {
|
||||
const DefElemOptionFormat createDatabaseOptionFormats[] = {
|
||||
{ "owner", " OWNER %s", OPTION_FORMAT_STRING },
|
||||
{ "template", " TEMPLATE %s", OPTION_FORMAT_STRING },
|
||||
{ "encoding", " ENCODING %s", OPTION_FORMAT_LITERAL_CSTR },
|
||||
|
@ -53,6 +55,14 @@ const DefElemOptionFormat create_database_option_formats[] = {
|
|||
{ "is_template", " IS_TEMPLATE %s", OPTION_FORMAT_BOOLEAN }
|
||||
};
|
||||
|
||||
|
||||
const DefElemOptionFormat alterDatabaseOptionFormats[] = {
|
||||
{ "is_template", " IS_TEMPLATE %s", OPTION_FORMAT_BOOLEAN },
|
||||
{ "allow_connections", " ALLOW_CONNECTIONS %s", OPTION_FORMAT_BOOLEAN },
|
||||
{ "connection_limit", " CONNECTION LIMIT %d", OPTION_FORMAT_INTEGER },
|
||||
};
|
||||
|
||||
|
||||
char *
|
||||
DeparseAlterDatabaseOwnerStmt(Node *node)
|
||||
{
|
||||
|
@ -112,48 +122,63 @@ AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt)
|
|||
|
||||
|
||||
static void
|
||||
AppendDefElemConnLimit(StringInfo buf, DefElem *def)
|
||||
AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
|
||||
{
|
||||
appendStringInfo(buf, " CONNECTION LIMIT %ld", (long int) defGetNumeric(def));
|
||||
if (list_length(stmt->options) == 0)
|
||||
{
|
||||
elog(ERROR, "got unexpected number of options for ALTER DATABASE");
|
||||
}
|
||||
|
||||
if (stmt->options)
|
||||
{
|
||||
DefElem *firstOption = linitial(stmt->options);
|
||||
if (strcmp(firstOption->defname, "tablespace") == 0)
|
||||
{
|
||||
AppendAlterDatabaseSetTablespace(buf, firstOption, stmt->dbname);
|
||||
|
||||
/* SET tablespace cannot be combined with other options */
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
appendStringInfo(buf, "ALTER DATABASE %s WITH",
|
||||
quote_identifier(stmt->dbname));
|
||||
|
||||
AppendBasicAlterDatabaseOptions(buf, stmt);
|
||||
}
|
||||
|
||||
appendStringInfo(buf, ";");
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
|
||||
AppendAlterDatabaseSetTablespace(StringInfo buf, DefElem *def, char *dbname)
|
||||
{
|
||||
appendStringInfo(buf, "ALTER DATABASE %s ", quote_identifier(stmt->dbname));
|
||||
appendStringInfo(buf,
|
||||
"ALTER DATABASE %s SET TABLESPACE %s",
|
||||
quote_identifier(dbname), quote_identifier(defGetString(def)));
|
||||
}
|
||||
|
||||
if (stmt->options)
|
||||
|
||||
/*
|
||||
* AppendBasicAlterDatabaseOptions appends basic ALTER DATABASE options to a string buffer.
|
||||
* Basic options are those that can be appended to the ALTER DATABASE statement
|
||||
* after the "WITH" keyword.(i.e. ALLOW_CONNECTIONS, CONNECTION LIMIT, IS_TEMPLATE)
|
||||
* For example, the tablespace option is not a basic option since it is defined via SET keyword.
|
||||
*
|
||||
* This function takes a string buffer and an AlterDatabaseStmt as input.
|
||||
* It appends the basic options to the string buffer.
|
||||
*
|
||||
*/
|
||||
static void
|
||||
AppendBasicAlterDatabaseOptions(StringInfo buf, AlterDatabaseStmt *stmt)
|
||||
{
|
||||
DefElem *def = NULL;
|
||||
foreach_ptr(def, stmt->options)
|
||||
{
|
||||
ListCell *cell = NULL;
|
||||
appendStringInfo(buf, "WITH ");
|
||||
foreach(cell, stmt->options)
|
||||
{
|
||||
DefElem *def = castNode(DefElem, lfirst(cell));
|
||||
if (strcmp(def->defname, "is_template") == 0)
|
||||
{
|
||||
appendStringInfo(buf, "IS_TEMPLATE %s",
|
||||
quote_literal_cstr(strVal(def->arg)));
|
||||
}
|
||||
else if (strcmp(def->defname, "connection_limit") == 0)
|
||||
{
|
||||
AppendDefElemConnLimit(buf, def);
|
||||
}
|
||||
else if (strcmp(def->defname, "allow_connections") == 0)
|
||||
{
|
||||
ereport(ERROR,
|
||||
errmsg("ALLOW_CONNECTIONS is not supported"));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR,
|
||||
errmsg("unrecognized ALTER DATABASE option: %s",
|
||||
def->defname));
|
||||
}
|
||||
}
|
||||
DefElemOptionToStatement(buf, def, alterDatabaseOptionFormats, lengthof(
|
||||
alterDatabaseOptionFormats));
|
||||
}
|
||||
|
||||
appendStringInfo(buf, ";");
|
||||
}
|
||||
|
||||
|
||||
|
@ -216,6 +241,22 @@ AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt)
|
|||
}
|
||||
|
||||
|
||||
char *
|
||||
DeparseAlterDatabaseRenameStmt(Node *node)
|
||||
{
|
||||
RenameStmt *stmt = (RenameStmt *) node;
|
||||
|
||||
StringInfoData str;
|
||||
initStringInfo(&str);
|
||||
|
||||
appendStringInfo(&str, "ALTER DATABASE %s RENAME TO %s",
|
||||
quote_identifier(stmt->subname),
|
||||
quote_identifier(stmt->newname));
|
||||
|
||||
return str.data;
|
||||
}
|
||||
|
||||
|
||||
char *
|
||||
DeparseAlterDatabaseSetStmt(Node *node)
|
||||
{
|
||||
|
@ -246,8 +287,8 @@ AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt)
|
|||
DefElem *option = NULL;
|
||||
foreach_ptr(option, stmt->options)
|
||||
{
|
||||
DefElemOptionToStatement(buf, option, create_database_option_formats,
|
||||
lengthof(create_database_option_formats));
|
||||
DefElemOptionToStatement(buf, option, createDatabaseOptionFormats,
|
||||
lengthof(createDatabaseOptionFormats));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,18 +49,42 @@
|
|||
#include "distributed/metadata/pg_dist_object.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
|
||||
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
||||
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress,
|
||||
char *objectName);
|
||||
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
||||
Datum *paramValues);
|
||||
static bool IsObjectDistributed(const ObjectAddress *address);
|
||||
|
||||
PG_FUNCTION_INFO_V1(mark_object_distributed);
|
||||
PG_FUNCTION_INFO_V1(citus_unmark_object_distributed);
|
||||
PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
|
||||
|
||||
|
||||
/*
|
||||
* mark_object_distributed adds an object to pg_dist_object
|
||||
* in all of the nodes.
|
||||
*/
|
||||
Datum
|
||||
mark_object_distributed(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureSuperUser();
|
||||
|
||||
Oid classId = PG_GETARG_OID(0);
|
||||
text *objectNameText = PG_GETARG_TEXT_P(1);
|
||||
char *objectName = text_to_cstring(objectNameText);
|
||||
Oid objectId = PG_GETARG_OID(2);
|
||||
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
|
||||
ObjectAddressSet(*objectAddress, classId, objectId);
|
||||
MarkObjectDistributedWithName(objectAddress, objectName);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_unmark_object_distributed(classid oid, objid oid, objsubid int)
|
||||
*
|
||||
|
@ -160,12 +184,28 @@ ObjectExists(const ObjectAddress *address)
|
|||
void
|
||||
MarkObjectDistributed(const ObjectAddress *distAddress)
|
||||
{
|
||||
MarkObjectDistributedWithName(distAddress, "");
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkObjectDistributedWithName marks an object as a distributed object.
|
||||
* Same as MarkObjectDistributed but this function also allows passing an objectName
|
||||
* that is used in case the object does not exists for the current transaction.
|
||||
*/
|
||||
void
|
||||
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName)
|
||||
{
|
||||
if (!CitusHasBeenLoaded())
|
||||
{
|
||||
elog(ERROR, "Cannot mark object distributed because Citus has not been loaded.");
|
||||
}
|
||||
MarkObjectDistributedLocally(distAddress);
|
||||
|
||||
if (EnableMetadataSync)
|
||||
{
|
||||
char *workerPgDistObjectUpdateCommand =
|
||||
CreatePgDistObjectEntryCommand(distAddress);
|
||||
CreatePgDistObjectEntryCommand(distAddress, objectName);
|
||||
SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand);
|
||||
}
|
||||
}
|
||||
|
@ -188,7 +228,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
|
|||
if (EnableMetadataSync)
|
||||
{
|
||||
char *workerPgDistObjectUpdateCommand =
|
||||
CreatePgDistObjectEntryCommand(distAddress);
|
||||
CreatePgDistObjectEntryCommand(distAddress, "");
|
||||
SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
|
||||
}
|
||||
}
|
||||
|
@ -279,17 +319,21 @@ ShouldMarkRelationDistributed(Oid relationId)
|
|||
* for the given object address.
|
||||
*/
|
||||
static char *
|
||||
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress)
|
||||
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, char *objectName)
|
||||
{
|
||||
/* create a list by adding the address of value to not to have warning */
|
||||
List *objectAddressList =
|
||||
list_make1((ObjectAddress *) objectAddress);
|
||||
|
||||
/* names also require a list so we create a nested list here */
|
||||
List *objectNameList = list_make1(list_make1((char *) objectName));
|
||||
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
|
||||
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
|
||||
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);
|
||||
|
||||
char *workerPgDistObjectUpdateCommand =
|
||||
MarkObjectsDistributedCreateCommand(objectAddressList,
|
||||
objectNameList,
|
||||
distArgumetIndexList,
|
||||
colocationIdList,
|
||||
forceDelegationList);
|
||||
|
|
|
@ -79,6 +79,7 @@
|
|||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/pg_dist_placement.h"
|
||||
#include "distributed/pg_dist_shard.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/shared_library_init.h"
|
||||
#include "distributed/utils/array_type.h"
|
||||
|
@ -5722,6 +5723,14 @@ GetPoolinfoViaCatalog(int32 nodeId)
|
|||
char *
|
||||
GetAuthinfoViaCatalog(const char *roleName, int64 nodeId)
|
||||
{
|
||||
/*
|
||||
* Citus will not be loaded when we run a global DDL command from a
|
||||
* Citus non-main database.
|
||||
*/
|
||||
if (!CitusHasBeenLoaded())
|
||||
{
|
||||
return "";
|
||||
}
|
||||
char *authinfo = "";
|
||||
Datum nodeIdDatumArray[2] = {
|
||||
Int32GetDatum(nodeId),
|
||||
|
|
|
@ -83,6 +83,7 @@
|
|||
#include "distributed/pg_dist_shard.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/remote_transaction.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/tenant_schema_metadata.h"
|
||||
#include "distributed/utils/array_type.h"
|
||||
|
@ -900,6 +901,7 @@ NodeListIdempotentInsertCommand(List *workerNodeList)
|
|||
*/
|
||||
char *
|
||||
MarkObjectsDistributedCreateCommand(List *addresses,
|
||||
List *namesArg,
|
||||
List *distributionArgumentIndexes,
|
||||
List *colocationIds,
|
||||
List *forceDelegations)
|
||||
|
@ -924,9 +926,25 @@ MarkObjectsDistributedCreateCommand(List *addresses,
|
|||
int forceDelegation = list_nth_int(forceDelegations, currentObjectCounter);
|
||||
List *names = NIL;
|
||||
List *args = NIL;
|
||||
char *objectType = NULL;
|
||||
|
||||
char *objectType = getObjectTypeDescription(address, false);
|
||||
getObjectIdentityParts(address, &names, &args, false);
|
||||
if (IsMainDBCommand)
|
||||
{
|
||||
/*
|
||||
* When we try to distribute an object that's being created in a non Citus
|
||||
* main database, we cannot find the name, since the object is not visible
|
||||
* in Citus main database.
|
||||
* Because of that we need to pass the name to this function.
|
||||
*/
|
||||
names = list_nth(namesArg, currentObjectCounter);
|
||||
bool missingOk = false;
|
||||
objectType = getObjectTypeDescription(address, missingOk);
|
||||
}
|
||||
else
|
||||
{
|
||||
objectType = getObjectTypeDescription(address, false);
|
||||
getObjectIdentityParts(address, &names, &args, IsMainDBCommand);
|
||||
}
|
||||
|
||||
if (!isFirstObject)
|
||||
{
|
||||
|
@ -5148,6 +5166,7 @@ SendDistObjectCommands(MetadataSyncContext *context)
|
|||
|
||||
char *workerMetadataUpdateCommand =
|
||||
MarkObjectsDistributedCreateCommand(list_make1(address),
|
||||
NIL,
|
||||
list_make1_int(distributionArgumentIndex),
|
||||
list_make1_int(colocationId),
|
||||
list_make1_int(forceDelegation));
|
||||
|
|
|
@ -94,6 +94,7 @@
|
|||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/remote_transaction.h"
|
||||
#include "distributed/repartition_executor.h"
|
||||
#include "distributed/replication_origin_session_utils.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
|
@ -2570,6 +2571,17 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
||||
NoticeIfSubqueryPushdownEnabled, NULL, NULL);
|
||||
|
||||
DefineCustomStringVariable(
|
||||
"citus.superuser",
|
||||
gettext_noop("Name of a superuser role to be used in Citus main database "
|
||||
"connections"),
|
||||
NULL,
|
||||
&SuperuserRole,
|
||||
"",
|
||||
PGC_SUSET,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomEnumVariable(
|
||||
"citus.task_assignment_policy",
|
||||
gettext_noop("Sets the policy to use when assigning tasks to worker nodes."),
|
||||
|
@ -3149,6 +3161,8 @@ CitusAuthHook(Port *port, int status)
|
|||
*/
|
||||
InitializeBackendData(port->application_name);
|
||||
|
||||
IsMainDB = (strncmp(MainDb, "", NAMEDATALEN) == 0 ||
|
||||
strncmp(MainDb, port->database_name, NAMEDATALEN) == 0);
|
||||
|
||||
/* let other authentication hooks to kick in first */
|
||||
if (original_client_auth_hook)
|
||||
|
|
|
@ -3,3 +3,10 @@
|
|||
|
||||
#include "udfs/citus_internal_database_command/12.2-1.sql"
|
||||
#include "udfs/citus_add_rebalance_strategy/12.2-1.sql"
|
||||
|
||||
#include "udfs/start_management_transaction/12.2-1.sql"
|
||||
#include "udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql"
|
||||
#include "udfs/mark_object_distributed/12.2-1.sql"
|
||||
#include "udfs/commit_management_command_2pc/12.2-1.sql"
|
||||
|
||||
ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8;
|
||||
|
|
|
@ -3,3 +3,20 @@
|
|||
DROP FUNCTION pg_catalog.citus_internal_database_command(text);
|
||||
|
||||
#include "../udfs/citus_add_rebalance_strategy/10.1-1.sql"
|
||||
|
||||
DROP FUNCTION citus_internal.start_management_transaction(
|
||||
outer_xid xid8
|
||||
);
|
||||
|
||||
DROP FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(
|
||||
query text,
|
||||
username text
|
||||
);
|
||||
|
||||
DROP FUNCTION citus_internal.mark_object_distributed(
|
||||
classId Oid, objectName text, objectId Oid
|
||||
);
|
||||
|
||||
DROP FUNCTION citus_internal.commit_management_command_2pc();
|
||||
|
||||
ALTER TABLE pg_catalog.pg_dist_transaction DROP COLUMN outer_xid;
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc()
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$;
|
||||
|
||||
COMMENT ON FUNCTION citus_internal.commit_management_command_2pc()
|
||||
IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit';
|
|
@ -0,0 +1,7 @@
|
|||
CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc()
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$;
|
||||
|
||||
COMMENT ON FUNCTION citus_internal.commit_management_command_2pc()
|
||||
IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit';
|
7
src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql
generated
Normal file
7
src/backend/distributed/sql/udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql
generated
Normal file
|
@ -0,0 +1,7 @@
|
|||
CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$;
|
||||
|
||||
COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
|
||||
IS 'executes a query on the nodes other than the current one';
|
|
@ -0,0 +1,7 @@
|
|||
CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$;
|
||||
|
||||
COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
|
||||
IS 'executes a query on the nodes other than the current one';
|
|
@ -0,0 +1,7 @@
|
|||
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', $$mark_object_distributed$$;
|
||||
|
||||
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
|
||||
IS 'adds an object to pg_dist_object on all nodes';
|
|
@ -0,0 +1,7 @@
|
|||
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', $$mark_object_distributed$$;
|
||||
|
||||
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
|
||||
IS 'adds an object to pg_dist_object on all nodes';
|
|
@ -0,0 +1,7 @@
|
|||
CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', $$start_management_transaction$$;
|
||||
|
||||
COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
|
||||
IS 'internal Citus function that starts a management transaction in the main database';
|
|
@ -0,0 +1,7 @@
|
|||
CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', $$start_management_transaction$$;
|
||||
|
||||
COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
|
||||
IS 'internal Citus function that starts a management transaction in the main database';
|
|
@ -19,14 +19,19 @@
|
|||
#include "miscadmin.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/xid8.h"
|
||||
|
||||
#include "distributed/backend_data.h"
|
||||
#include "distributed/citus_safe_lib.h"
|
||||
#include "distributed/commands/utility_hook.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/remote_transaction.h"
|
||||
|
@ -56,6 +61,9 @@ static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection
|
|||
|
||||
static void Assign2PCIdentifier(MultiConnection *connection);
|
||||
|
||||
PG_FUNCTION_INFO_V1(start_management_transaction);
|
||||
PG_FUNCTION_INFO_V1(execute_command_on_remote_nodes_as_user);
|
||||
PG_FUNCTION_INFO_V1(commit_management_command_2pc);
|
||||
|
||||
static char *IsolationLevelName[] = {
|
||||
"READ UNCOMMITTED",
|
||||
|
@ -64,6 +72,154 @@ static char *IsolationLevelName[] = {
|
|||
"SERIALIZABLE"
|
||||
};
|
||||
|
||||
/*
|
||||
* These variables are necessary for running queries from a database that is not
|
||||
* the Citus main database. Some of these queries need to be propagated to the
|
||||
* workers and Citus main database will be used for these queries, such as
|
||||
* CREATE ROLE. For that we create a connection to the Citus main database and
|
||||
* run queries from there.
|
||||
*/
|
||||
|
||||
/* The MultiConnection used for connecting Citus main database. */
|
||||
MultiConnection *MainDBConnection = NULL;
|
||||
|
||||
/*
|
||||
* IsMainDBCommand is true if this is a query in the Citus main database that is started
|
||||
* by a query from a different database.
|
||||
*/
|
||||
bool IsMainDBCommand = false;
|
||||
|
||||
/*
|
||||
* The transaction id of the query from the other database that started the
|
||||
* main database query.
|
||||
*/
|
||||
FullTransactionId OuterXid;
|
||||
|
||||
/*
|
||||
* Shows if this is the Citus main database or not. We needed a variable instead of
|
||||
* checking if this database's name is the same as MainDb because we sometimes need
|
||||
* this value outside a transaction where we cannot reach the current database name.
|
||||
*/
|
||||
bool IsMainDB = true;
|
||||
|
||||
/*
|
||||
* Name of a superuser role to be used during main database connections.
|
||||
*/
|
||||
char *SuperuserRole = NULL;
|
||||
|
||||
|
||||
/*
|
||||
* start_management_transaction starts a management transaction
|
||||
* in the main database by recording the outer transaction's transaction id and setting
|
||||
* IsMainDBCommand to true.
|
||||
*/
|
||||
Datum
|
||||
start_management_transaction(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureSuperUser();
|
||||
|
||||
OuterXid = PG_GETARG_FULLTRANSACTIONID(0);
|
||||
IsMainDBCommand = true;
|
||||
|
||||
Use2PCForCoordinatedTransaction();
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* execute_command_on_remote_nodes_as_user executes the query on the nodes
|
||||
* other than the current node, using the user passed.
|
||||
*/
|
||||
Datum
|
||||
execute_command_on_remote_nodes_as_user(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureSuperUser();
|
||||
|
||||
text *queryText = PG_GETARG_TEXT_P(0);
|
||||
char *query = text_to_cstring(queryText);
|
||||
|
||||
text *usernameText = PG_GETARG_TEXT_P(1);
|
||||
char *username = text_to_cstring(usernameText);
|
||||
|
||||
StringInfo queryToSend = makeStringInfo();
|
||||
|
||||
appendStringInfo(queryToSend, "%s;%s;%s", DISABLE_METADATA_SYNC, query,
|
||||
ENABLE_METADATA_SYNC);
|
||||
|
||||
SendCommandToWorkersAsUser(REMOTE_NODES, username, queryToSend->data);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* commit_management_command_2pc is a wrapper UDF for
|
||||
* CoordinatedRemoteTransactionsCommit
|
||||
*/
|
||||
Datum
|
||||
commit_management_command_2pc(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureSuperUser();
|
||||
|
||||
RecoverTwoPhaseCommits();
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RunCitusMainDBQuery creates a connection to Citus main database if necessary
|
||||
* and runs the query over the connection in the main database.
|
||||
*/
|
||||
void
|
||||
RunCitusMainDBQuery(char *query)
|
||||
{
|
||||
if (MainDBConnection == NULL)
|
||||
{
|
||||
if (strlen(SuperuserRole) == 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("No superuser role is given for Citus main "
|
||||
"database connection"),
|
||||
errhint("Set citus.superuser to a superuser role name")));
|
||||
}
|
||||
int flags = 0;
|
||||
MainDBConnection = GetNodeUserDatabaseConnection(flags, LocalHostName,
|
||||
PostPortNumber,
|
||||
SuperuserRole,
|
||||
MainDb);
|
||||
RemoteTransactionBegin(MainDBConnection);
|
||||
}
|
||||
|
||||
SendRemoteCommand(MainDBConnection, query);
|
||||
|
||||
PGresult *result = GetRemoteCommandResult(MainDBConnection, true);
|
||||
|
||||
if (!IsResponseOK(result))
|
||||
{
|
||||
ReportResultError(MainDBConnection, result, ERROR);
|
||||
}
|
||||
|
||||
ForgetResults(MainDBConnection);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CleanCitusMainDBConnection closes and removes the connection to Citus main database.
|
||||
*/
|
||||
void
|
||||
CleanCitusMainDBConnection(void)
|
||||
{
|
||||
if (MainDBConnection == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
CloseConnection(MainDBConnection);
|
||||
MainDBConnection = NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartRemoteTransactionBegin initiates beginning the remote transaction in
|
||||
|
@ -616,7 +772,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection)
|
|||
WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port);
|
||||
if (workerNode != NULL)
|
||||
{
|
||||
LogTransactionRecord(workerNode->groupId, transaction->preparedName);
|
||||
LogTransactionRecord(workerNode->groupId, transaction->preparedName, OuterXid);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "catalog/dependency.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "nodes/print.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "storage/fd.h"
|
||||
#include "utils/datum.h"
|
||||
#include "utils/guc.h"
|
||||
|
@ -46,6 +47,7 @@
|
|||
#include "distributed/multi_logical_replication.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/repartition_join_execution.h"
|
||||
#include "distributed/replication_origin_session_utils.h"
|
||||
#include "distributed/shard_cleaner.h"
|
||||
|
@ -55,6 +57,9 @@
|
|||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_log_messages.h"
|
||||
|
||||
#define COMMIT_MANAGEMENT_COMMAND_2PC \
|
||||
"SELECT citus_internal.commit_management_command_2pc()"
|
||||
|
||||
|
||||
CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||
|
||||
|
@ -317,12 +322,23 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
MemoryContext previousContext =
|
||||
MemoryContextSwitchTo(CitusXactCallbackContext);
|
||||
|
||||
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
|
||||
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED &&
|
||||
!IsMainDBCommand)
|
||||
{
|
||||
/* handles both already prepared and open transactions */
|
||||
CoordinatedRemoteTransactionsCommit();
|
||||
}
|
||||
|
||||
/*
|
||||
* If this is a non-Citus main database we should try to commit the prepared
|
||||
* transactions created by the Citus main database on the worker nodes.
|
||||
*/
|
||||
if (!IsMainDB && MainDBConnection != NULL)
|
||||
{
|
||||
RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC);
|
||||
CleanCitusMainDBConnection();
|
||||
}
|
||||
|
||||
/* close connections etc. */
|
||||
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
|
||||
{
|
||||
|
@ -378,6 +394,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
|
||||
RemoveIntermediateResultsDirectories();
|
||||
|
||||
CleanCitusMainDBConnection();
|
||||
|
||||
/* handles both already prepared and open transactions */
|
||||
if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
|
||||
{
|
||||
|
@ -509,6 +527,17 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
break;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* If this is a non-Citus main database we should commit the Citus
|
||||
* main database query. So if some error happens on the distributed main
|
||||
* database query we wouldn't have committed the current query.
|
||||
*/
|
||||
if (!IsMainDB && MainDBConnection != NULL)
|
||||
{
|
||||
RunCitusMainDBQuery("COMMIT");
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO: It'd probably be a good idea to force constraints and
|
||||
* such to 'immediate' here. Deferred triggers might try to send
|
||||
|
@ -537,7 +566,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
* us to mark failed placements as invalid. Better don't use
|
||||
* this for anything important (i.e. DDL/metadata).
|
||||
*/
|
||||
CoordinatedRemoteTransactionsCommit();
|
||||
if (IsMainDB)
|
||||
{
|
||||
CoordinatedRemoteTransactionsCommit();
|
||||
}
|
||||
CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,10 +29,12 @@
|
|||
#include "lib/stringinfo.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/lock.h"
|
||||
#include "storage/procarray.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/xid8.h"
|
||||
|
||||
#include "pg_version_constants.h"
|
||||
|
||||
|
@ -82,7 +84,7 @@ recover_prepared_transactions(PG_FUNCTION_ARGS)
|
|||
* prepared transaction should be committed.
|
||||
*/
|
||||
void
|
||||
LogTransactionRecord(int32 groupId, char *transactionName)
|
||||
LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId outerXid)
|
||||
{
|
||||
Datum values[Natts_pg_dist_transaction];
|
||||
bool isNulls[Natts_pg_dist_transaction];
|
||||
|
@ -93,6 +95,7 @@ LogTransactionRecord(int32 groupId, char *transactionName)
|
|||
|
||||
values[Anum_pg_dist_transaction_groupid - 1] = Int32GetDatum(groupId);
|
||||
values[Anum_pg_dist_transaction_gid - 1] = CStringGetTextDatum(transactionName);
|
||||
values[Anum_pg_dist_transaction_outerxid - 1] = FullTransactionIdGetDatum(outerXid);
|
||||
|
||||
/* open transaction relation and insert new tuple */
|
||||
Relation pgDistTransaction = table_open(DistTransactionRelationId(),
|
||||
|
@ -258,6 +261,54 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
|||
continue;
|
||||
}
|
||||
|
||||
/* Check if the transaction is created by an outer transaction from a non-main database */
|
||||
bool outerXidIsNull = false;
|
||||
Datum outerXidDatum = heap_getattr(heapTuple,
|
||||
Anum_pg_dist_transaction_outerxid,
|
||||
tupleDescriptor, &outerXidIsNull);
|
||||
|
||||
TransactionId outerXid = 0;
|
||||
if (!outerXidIsNull)
|
||||
{
|
||||
FullTransactionId outerFullXid = DatumGetFullTransactionId(outerXidDatum);
|
||||
outerXid = XidFromFullTransactionId(outerFullXid);
|
||||
}
|
||||
|
||||
if (outerXid != 0)
|
||||
{
|
||||
bool outerXactIsInProgress = TransactionIdIsInProgress(outerXid);
|
||||
bool outerXactDidCommit = TransactionIdDidCommit(outerXid);
|
||||
if (outerXactIsInProgress && !outerXactDidCommit)
|
||||
{
|
||||
/*
|
||||
* The transaction is initiated from an outer transaction and the outer
|
||||
* transaction is not yet committed, so we should not commit either.
|
||||
* We remove this transaction from the pendingTransactionSet so it'll
|
||||
* not be aborted by the loop below.
|
||||
*/
|
||||
hash_search(pendingTransactionSet, transactionName, HASH_REMOVE,
|
||||
&foundPreparedTransactionBeforeCommit);
|
||||
continue;
|
||||
}
|
||||
else if (!outerXactIsInProgress && !outerXactDidCommit)
|
||||
{
|
||||
/*
|
||||
* Since outer transaction isn't in progress and did not commit we need to
|
||||
* abort the prepared transaction too. We do this by simply doing the same
|
||||
* thing we would do for transactions that are initiated from the main
|
||||
* database.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Outer transaction did commit, so we can try to commit the prepared
|
||||
* transaction too.
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Remove the transaction from the pending list such that only transactions
|
||||
* that need to be aborted remain at the end.
|
||||
|
|
|
@ -234,7 +234,8 @@ List *
|
|||
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
|
||||
{
|
||||
List *workerNodeList = NIL;
|
||||
if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES)
|
||||
if (targetWorkerSet == ALL_SHARD_NODES ||
|
||||
targetWorkerSet == METADATA_NODES)
|
||||
{
|
||||
workerNodeList = ActivePrimaryNodeList(lockMode);
|
||||
}
|
||||
|
|
|
@ -244,6 +244,8 @@ extern List * DropDatabaseStmtObjectAddress(Node *node, bool missingOk,
|
|||
bool isPostprocess);
|
||||
extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missingOk,
|
||||
bool isPostprocess);
|
||||
extern List * GenerateGrantDatabaseCommandList(void);
|
||||
extern List * PostprocessAlterDatabaseRenameStmt(Node *node, const char *queryString);
|
||||
extern void EnsureSupportedCreateDatabaseCommand(CreatedbStmt *stmt);
|
||||
extern char * CreateDatabaseDDLCommand(Oid dbId);
|
||||
|
||||
|
|
|
@ -252,6 +252,7 @@ extern char * DeparseAlterDatabaseRefreshCollStmt(Node *node);
|
|||
extern char * DeparseAlterDatabaseSetStmt(Node *node);
|
||||
extern char * DeparseCreateDatabaseStmt(Node *node);
|
||||
extern char * DeparseDropDatabaseStmt(Node *node);
|
||||
extern char * DeparseAlterDatabaseRenameStmt(Node *node);
|
||||
|
||||
|
||||
/* forward declaration for deparse_publication_stmts.c */
|
||||
|
|
|
@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
|
|||
extern bool IsAnyObjectDistributed(const List *addresses);
|
||||
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
||||
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
||||
extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name);
|
||||
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
||||
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
||||
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
||||
|
|
|
@ -89,6 +89,7 @@ extern List * NodeMetadataCreateCommands(void);
|
|||
extern List * CitusTableMetadataCreateCommandList(Oid relationId);
|
||||
extern List * NodeMetadataDropCommands(void);
|
||||
extern char * MarkObjectsDistributedCreateCommand(List *addresses,
|
||||
List *names,
|
||||
List *distributionArgumentIndexes,
|
||||
List *colocationIds,
|
||||
List *forceDelegations);
|
||||
|
|
|
@ -35,9 +35,10 @@ typedef FormData_pg_dist_transaction *Form_pg_dist_transaction;
|
|||
* compiler constants for pg_dist_transaction
|
||||
* ----------------
|
||||
*/
|
||||
#define Natts_pg_dist_transaction 2
|
||||
#define Natts_pg_dist_transaction 3
|
||||
#define Anum_pg_dist_transaction_groupid 1
|
||||
#define Anum_pg_dist_transaction_gid 2
|
||||
#define Anum_pg_dist_transaction_outerxid 3
|
||||
|
||||
|
||||
#endif /* PG_DIST_TRANSACTION_H */
|
||||
|
|
|
@ -144,4 +144,13 @@ extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId);
|
|||
extern void CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId);
|
||||
extern void CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId);
|
||||
|
||||
extern void RunCitusMainDBQuery(char *query);
|
||||
extern void CleanCitusMainDBConnection(void);
|
||||
|
||||
extern bool IsMainDBCommand;
|
||||
extern bool IsMainDB;
|
||||
extern char *SuperuserRole;
|
||||
extern char *MainDb;
|
||||
extern struct MultiConnection *MainDBConnection;
|
||||
|
||||
#endif /* REMOTE_TRANSACTION_H */
|
||||
|
|
|
@ -17,7 +17,8 @@ extern int Recover2PCInterval;
|
|||
|
||||
|
||||
/* Functions declarations for worker transactions */
|
||||
extern void LogTransactionRecord(int32 groupId, char *transactionName);
|
||||
extern void LogTransactionRecord(int32 groupId, char *transactionName,
|
||||
FullTransactionId outerXid);
|
||||
extern int RecoverTwoPhaseCommits(void);
|
||||
extern void DeleteWorkerTransactions(WorkerNode *workerNode);
|
||||
|
||||
|
|
|
@ -0,0 +1,154 @@
|
|||
def test_main_commited_outer_not_yet(cluster):
|
||||
c = cluster.coordinator
|
||||
w0 = cluster.workers[0]
|
||||
|
||||
# create a non-main database
|
||||
c.sql("CREATE DATABASE db1")
|
||||
|
||||
# we will use cur1 to simulate non-main database user and
|
||||
# cur2 to manually do the steps we would do in the main database
|
||||
with c.cur(dbname="db1") as cur1, c.cur() as cur2:
|
||||
# let's start a transaction and find its transaction id
|
||||
cur1.execute("BEGIN")
|
||||
cur1.execute("SELECT txid_current()")
|
||||
txid = cur1.fetchall()
|
||||
|
||||
# using the transaction id of the cur1 simulate the main database commands manually
|
||||
cur2.execute("BEGIN")
|
||||
cur2.execute(
|
||||
"SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),)
|
||||
)
|
||||
cur2.execute(
|
||||
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')"
|
||||
)
|
||||
cur2.execute(
|
||||
"SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)"
|
||||
)
|
||||
cur2.execute("COMMIT")
|
||||
|
||||
# run the transaction recovery
|
||||
c.sql("SELECT recover_prepared_transactions()")
|
||||
|
||||
# user should not be created on the worker because outer transaction is not committed yet
|
||||
role_before_commit = w0.sql_value(
|
||||
"SELECT count(*) FROM pg_roles WHERE rolname = 'u1'"
|
||||
)
|
||||
|
||||
assert (
|
||||
int(role_before_commit) == 0
|
||||
), "role is on pg_dist_object despite not committing"
|
||||
|
||||
# user should not be in pg_dist_object on the worker because outer transaction is not committed yet
|
||||
pdo_before_commit = w0.sql_value(
|
||||
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
|
||||
)
|
||||
|
||||
assert int(pdo_before_commit) == 0, "role is created despite not committing"
|
||||
|
||||
# commit in cur1 so the transaction recovery thinks this is a successful transaction
|
||||
cur1.execute("COMMIT")
|
||||
|
||||
# run the transaction recovery again after committing
|
||||
c.sql("SELECT recover_prepared_transactions()")
|
||||
|
||||
# check that the user is created by the transaction recovery on the worker
|
||||
role_after_commit = w0.sql_value(
|
||||
"SELECT count(*) FROM pg_roles WHERE rolname = 'u1'"
|
||||
)
|
||||
|
||||
assert (
|
||||
int(role_after_commit) == 1
|
||||
), "role is not created during recovery despite committing"
|
||||
|
||||
# check that the user is on pg_dist_object on the worker after transaction recovery
|
||||
pdo_after_commit = w0.sql_value(
|
||||
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
|
||||
)
|
||||
|
||||
assert (
|
||||
int(pdo_after_commit) == 1
|
||||
), "role is not on pg_dist_object after recovery despite committing"
|
||||
|
||||
c.sql("DROP DATABASE db1")
|
||||
c.sql(
|
||||
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('DROP USER u1', 'postgres')"
|
||||
)
|
||||
c.sql(
|
||||
"""
|
||||
SELECT run_command_on_workers($$
|
||||
DELETE FROM pg_dist_object
|
||||
WHERE objid::regrole::text = 'u1'
|
||||
$$)
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def test_main_commited_outer_aborted(cluster):
|
||||
c = cluster.coordinator
|
||||
w0 = cluster.workers[0]
|
||||
|
||||
# create a non-main database
|
||||
c.sql("CREATE DATABASE db2")
|
||||
|
||||
# we will use cur1 to simulate non-main database user and
|
||||
# cur2 to manually do the steps we would do in the main database
|
||||
with c.cur(dbname="db2") as cur1, c.cur() as cur2:
|
||||
# let's start a transaction and find its transaction id
|
||||
cur1.execute("BEGIN")
|
||||
cur1.execute("SELECT txid_current()")
|
||||
txid = cur1.fetchall()
|
||||
|
||||
# using the transaction id of the cur1 simulate the main database commands manually
|
||||
cur2.execute("BEGIN")
|
||||
cur2.execute(
|
||||
"SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),)
|
||||
)
|
||||
cur2.execute(
|
||||
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u2;', 'postgres')"
|
||||
)
|
||||
cur2.execute(
|
||||
"SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321)"
|
||||
)
|
||||
cur2.execute("COMMIT")
|
||||
|
||||
# abort cur1 so the transaction recovery thinks this is an aborted transaction
|
||||
cur1.execute("ABORT")
|
||||
|
||||
# check that the user is not yet created on the worker
|
||||
role_before_recovery = w0.sql_value(
|
||||
"SELECT count(*) FROM pg_roles WHERE rolname = 'u2'"
|
||||
)
|
||||
|
||||
assert int(role_before_recovery) == 0, "role is already created before recovery"
|
||||
|
||||
# check that the user is not on pg_dist_object on the worker
|
||||
pdo_before_recovery = w0.sql_value(
|
||||
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
|
||||
)
|
||||
|
||||
assert (
|
||||
int(pdo_before_recovery) == 0
|
||||
), "role is already on pg_dist_object before recovery"
|
||||
|
||||
# run the transaction recovery
|
||||
c.sql("SELECT recover_prepared_transactions()")
|
||||
|
||||
# check that the user is not created by the transaction recovery on the worker
|
||||
role_after_recovery = w0.sql_value(
|
||||
"SELECT count(*) FROM pg_roles WHERE rolname = 'u2'"
|
||||
)
|
||||
|
||||
assert (
|
||||
int(role_after_recovery) == 0
|
||||
), "role is created during recovery despite aborting"
|
||||
|
||||
# check that the user is not on pg_dist_object on the worker after transaction recovery
|
||||
pdo_after_recovery = w0.sql_value(
|
||||
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
|
||||
)
|
||||
|
||||
assert (
|
||||
int(pdo_after_recovery) == 0
|
||||
), "role is on pg_dist_object after recovery despite aborting"
|
||||
|
||||
c.sql("DROP DATABASE db2")
|
|
@ -1,38 +1,30 @@
|
|||
set citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%ALTER DATABASE%';
|
||||
-- since ALLOW_CONNECTIONS alter option should be executed in a different database
|
||||
-- and since we don't have a multiple database support for now,
|
||||
-- this statement will get error
|
||||
alter database regression ALLOW_CONNECTIONS false;
|
||||
ERROR: ALLOW_CONNECTIONS is not supported
|
||||
alter database regression with CONNECTION LIMIT 100;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
alter database regression with IS_TEMPLATE true CONNECTION LIMIT 50;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true' CONNECTION LIMIT 50;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true CONNECTION LIMIT 50;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true' CONNECTION LIMIT 50;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true CONNECTION LIMIT 50;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
alter database regression with CONNECTION LIMIT -1;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
alter database regression with IS_TEMPLATE true;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true';
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true';
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
alter database regression with IS_TEMPLATE false;
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'false';
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE false;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'false';
|
||||
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE false;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
-- this statement will get error since we don't have a multiple database support for now
|
||||
alter database regression rename to regression2;
|
||||
ERROR: current database cannot be renamed
|
||||
alter database regression set default_transaction_read_only = true;
|
||||
NOTICE: issuing ALTER DATABASE regression SET default_transaction_read_only = 'true'
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
|
@ -147,4 +139,86 @@ NOTICE: issuing ALTER DATABASE regression RESET lock_timeout
|
|||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE regression RESET lock_timeout
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
set citus.enable_create_database_propagation=on;
|
||||
create database "regression!'2";
|
||||
alter database "regression!'2" with CONNECTION LIMIT 100;
|
||||
NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50;
|
||||
NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE true CONNECTION LIMIT 50;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE true CONNECTION LIMIT 50;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
alter database "regression!'2" with IS_TEMPLATE false;
|
||||
NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE false;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE false;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts3'
|
||||
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
|
||||
\c - - - :worker_1_port
|
||||
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts4'
|
||||
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
|
||||
\c - - - :worker_2_port
|
||||
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts5'
|
||||
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
|
||||
\c - - - :master_port
|
||||
set citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%ALTER DATABASE%';
|
||||
alter database "regression!'2" set TABLESPACE alter_db_tablespace;
|
||||
NOTICE: issuing ALTER DATABASE "regression!'2" SET TABLESPACE alter_db_tablespace
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE "regression!'2" SET TABLESPACE alter_db_tablespace
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
set citus.enable_create_database_propagation=on;
|
||||
alter database "regression!'2" rename to regression3;
|
||||
NOTICE: issuing ALTER DATABASE "regression!'2" RENAME TO regression3
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE "regression!'2" RENAME TO regression3
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
-- check that the local database rename and alter comnmand is not propagated
|
||||
set citus.enable_create_database_propagation=off;
|
||||
CREATE database local_regression;
|
||||
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
|
||||
DETAIL: Citus does not propagate CREATE DATABASE command to workers
|
||||
HINT: You can manually create a database and its extensions on workers.
|
||||
alter DATABASE local_regression with CONNECTION LIMIT 100;
|
||||
alter DATABASE local_regression rename to local_regression2;
|
||||
drop database local_regression2;
|
||||
set citus.enable_create_database_propagation=on;
|
||||
drop database regression3;
|
||||
create database "regression!'4";
|
||||
SELECT result FROM run_command_on_all_nodes(
|
||||
$$
|
||||
ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape"
|
||||
$$
|
||||
);
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
ALTER TABLESPACE
|
||||
ALTER TABLESPACE
|
||||
ALTER TABLESPACE
|
||||
(3 rows)
|
||||
|
||||
alter database "regression!'4" set TABLESPACE "ts-needs\!escape";
|
||||
NOTICE: issuing ALTER DATABASE "regression!'4" SET TABLESPACE "ts-needs\!escape"
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE "regression!'4" SET TABLESPACE "ts-needs\!escape"
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
drop database "regression!'4";
|
||||
set citus.log_remote_commands = false;
|
||||
set citus.enable_create_database_propagation=off;
|
||||
SELECT result FROM run_command_on_all_nodes(
|
||||
$$
|
||||
drop tablespace "ts-needs\!escape"
|
||||
$$
|
||||
);
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
DROP TABLESPACE
|
||||
DROP TABLESPACE
|
||||
DROP TABLESPACE
|
||||
(3 rows)
|
||||
|
||||
|
|
|
@ -209,19 +209,7 @@ SELECT result FROM run_command_on_all_nodes(
|
|||
CREATE USER "role-needs\!escape";
|
||||
CREATE DATABASE "db-needs\!escape" owner "role-needs\!escape" tablespace "ts-needs\!escape";
|
||||
-- Rename it to make check_database_on_all_nodes happy.
|
||||
-- Today we don't support ALTER DATABASE .. RENAME TO .., so need to propagate it manually.
|
||||
SELECT result FROM run_command_on_all_nodes(
|
||||
$$
|
||||
ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape
|
||||
$$
|
||||
);
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
ALTER DATABASE
|
||||
ALTER DATABASE
|
||||
ALTER DATABASE
|
||||
(3 rows)
|
||||
|
||||
ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape;
|
||||
SELECT * FROM public.check_database_on_all_nodes('db_needs_escape') ORDER BY node_type;
|
||||
node_type | result
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -0,0 +1,154 @@
|
|||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE SCHEMA failure_non_main_db_2pc;
|
||||
SET SEARCH_PATH TO 'failure_non_main_db_2pc';
|
||||
CREATE DATABASE other_db1;
|
||||
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
|
||||
DETAIL: Citus does not propagate CREATE DATABASE command to workers
|
||||
HINT: You can manually create a database and its extensions on workers.
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c other_db1
|
||||
CREATE USER user_1;
|
||||
\c regression
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
|
||||
nodeid | result
|
||||
---------------------------------------------------------------------
|
||||
0 | user_1
|
||||
1 | user_1
|
||||
2 |
|
||||
(3 rows)
|
||||
|
||||
SELECT recover_prepared_transactions();
|
||||
recover_prepared_transactions
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
|
||||
nodeid | result
|
||||
---------------------------------------------------------------------
|
||||
0 | user_1
|
||||
1 | user_1
|
||||
2 | user_1
|
||||
(3 rows)
|
||||
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c other_db1
|
||||
CREATE USER user_2;
|
||||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
while executing command on localhost:xxxxx
|
||||
\c regression
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
|
||||
nodeid | result
|
||||
---------------------------------------------------------------------
|
||||
0 |
|
||||
1 |
|
||||
2 |
|
||||
(3 rows)
|
||||
|
||||
SELECT recover_prepared_transactions();
|
||||
recover_prepared_transactions
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
|
||||
nodeid | result
|
||||
---------------------------------------------------------------------
|
||||
0 |
|
||||
1 |
|
||||
2 |
|
||||
(3 rows)
|
||||
|
||||
DROP DATABASE other_db1;
|
||||
-- user_2 should not exist because the query to create it will fail
|
||||
-- but let's make sure we try to drop it just in case
|
||||
DROP USER IF EXISTS user_1, user_2;
|
||||
NOTICE: role "user_2" does not exist, skipping
|
||||
SELECT citus_set_coordinator_host('localhost');
|
||||
citus_set_coordinator_host
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
CREATE DATABASE other_db2;
|
||||
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
|
||||
DETAIL: Citus does not propagate CREATE DATABASE command to workers
|
||||
HINT: You can manually create a database and its extensions on workers.
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c other_db2
|
||||
CREATE USER user_3;
|
||||
\c regression
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
|
||||
user_3
|
||||
user_3
|
||||
(3 rows)
|
||||
|
||||
SELECT recover_prepared_transactions();
|
||||
recover_prepared_transactions
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
user_3
|
||||
user_3
|
||||
user_3
|
||||
(3 rows)
|
||||
|
||||
DROP DATABASE other_db2;
|
||||
DROP USER user_3;
|
||||
\c - - - :master_port
|
||||
SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$);
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
DELETE 1
|
||||
DELETE 1
|
||||
DELETE 1
|
||||
(3 rows)
|
||||
|
||||
DROP SCHEMA failure_non_main_db_2pc;
|
|
@ -1420,10 +1420,14 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
-- Snapshot of state at 12.2-1
|
||||
ALTER EXTENSION citus UPDATE TO '12.2-1';
|
||||
SELECT * FROM multi_extension.print_extension_changes();
|
||||
previous_object | current_object
|
||||
previous_object | current_object
|
||||
---------------------------------------------------------------------
|
||||
| function citus_internal.commit_management_command_2pc() void
|
||||
| function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void
|
||||
| function citus_internal.mark_object_distributed(oid,text,oid) void
|
||||
| function citus_internal.start_management_transaction(xid8) void
|
||||
| function citus_internal_database_command(text) void
|
||||
(1 row)
|
||||
(5 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -64,7 +64,7 @@ SELECT recover_prepared_transactions();
|
|||
(1 row)
|
||||
|
||||
-- delete the citus_122_should_do_nothing transaction
|
||||
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *;
|
||||
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid;
|
||||
groupid | gid
|
||||
---------------------------------------------------------------------
|
||||
122 | citus_122_should_do_nothing
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
CREATE SCHEMA other_databases;
|
||||
SET search_path TO other_databases;
|
||||
SET citus.next_shard_id TO 10231023;
|
||||
CREATE DATABASE other_db1;
|
||||
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
|
||||
DETAIL: Citus does not propagate CREATE DATABASE command to workers
|
||||
HINT: You can manually create a database and its extensions on workers.
|
||||
\c other_db1
|
||||
SHOW citus.main_db;
|
||||
citus.main_db
|
||||
---------------------------------------------------------------------
|
||||
regression
|
||||
(1 row)
|
||||
|
||||
-- check that empty citus.superuser gives error
|
||||
SET citus.superuser TO '';
|
||||
CREATE USER empty_superuser;
|
||||
ERROR: No superuser role is given for Citus main database connection
|
||||
HINT: Set citus.superuser to a superuser role name
|
||||
SET citus.superuser TO 'postgres';
|
||||
CREATE USER other_db_user1;
|
||||
CREATE USER other_db_user2;
|
||||
BEGIN;
|
||||
CREATE USER other_db_user3;
|
||||
CREATE USER other_db_user4;
|
||||
COMMIT;
|
||||
BEGIN;
|
||||
CREATE USER other_db_user5;
|
||||
CREATE USER other_db_user6;
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
CREATE USER other_db_user7;
|
||||
SELECT 1/0;
|
||||
ERROR: division by zero
|
||||
COMMIT;
|
||||
CREATE USER other_db_user8;
|
||||
\c regression
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
|
||||
usename
|
||||
---------------------------------------------------------------------
|
||||
other_db_user1
|
||||
other_db_user2
|
||||
other_db_user3
|
||||
other_db_user4
|
||||
other_db_user8
|
||||
(5 rows)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
|
||||
usename
|
||||
---------------------------------------------------------------------
|
||||
other_db_user1
|
||||
other_db_user2
|
||||
other_db_user3
|
||||
other_db_user4
|
||||
other_db_user8
|
||||
(5 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
-- some user creation commands will fail but let's make sure we try to drop them just in case
|
||||
DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8;
|
||||
NOTICE: role "other_db_user5" does not exist, skipping
|
||||
NOTICE: role "other_db_user6" does not exist, skipping
|
||||
NOTICE: role "other_db_user7" does not exist, skipping
|
||||
-- Make sure non-superuser roles cannot use internal GUCs
|
||||
-- but they can still create a role
|
||||
CREATE USER nonsuperuser CREATEROLE;
|
||||
GRANT ALL ON SCHEMA citus_internal TO nonsuperuser;
|
||||
SET ROLE nonsuperuser;
|
||||
SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres');
|
||||
ERROR: operation is not allowed
|
||||
HINT: Run the command with a superuser.
|
||||
\c other_db1
|
||||
CREATE USER other_db_user9;
|
||||
RESET ROLE;
|
||||
\c regression
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
|
||||
usename
|
||||
---------------------------------------------------------------------
|
||||
other_db_user9
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
|
||||
usename
|
||||
---------------------------------------------------------------------
|
||||
other_db_user9
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser;
|
||||
DROP USER other_db_user9, nonsuperuser;
|
||||
-- test from a worker
|
||||
\c - - - :worker_1_port
|
||||
CREATE DATABASE other_db2;
|
||||
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
|
||||
DETAIL: Citus does not propagate CREATE DATABASE command to workers
|
||||
HINT: You can manually create a database and its extensions on workers.
|
||||
\c other_db2
|
||||
CREATE USER worker_user1;
|
||||
BEGIN;
|
||||
CREATE USER worker_user2;
|
||||
COMMIT;
|
||||
BEGIN;
|
||||
CREATE USER worker_user3;
|
||||
ROLLBACK;
|
||||
\c regression
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
|
||||
usename
|
||||
---------------------------------------------------------------------
|
||||
worker_user1
|
||||
worker_user2
|
||||
(2 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
|
||||
usename
|
||||
---------------------------------------------------------------------
|
||||
worker_user1
|
||||
worker_user2
|
||||
(2 rows)
|
||||
|
||||
-- some user creation commands will fail but let's make sure we try to drop them just in case
|
||||
DROP USER IF EXISTS worker_user1, worker_user2, worker_user3;
|
||||
NOTICE: role "worker_user3" does not exist, skipping
|
||||
\c - - - :worker_1_port
|
||||
DROP DATABASE other_db2;
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA other_databases;
|
||||
DROP DATABASE other_db1;
|
|
@ -56,13 +56,17 @@ ORDER BY 1;
|
|||
function citus_get_active_worker_nodes()
|
||||
function citus_get_node_clock()
|
||||
function citus_get_transaction_clock()
|
||||
function citus_internal.commit_management_command_2pc()
|
||||
function citus_internal.execute_command_on_remote_nodes_as_user(text,text)
|
||||
function citus_internal.find_groupid_for_node(text,integer)
|
||||
function citus_internal.mark_object_distributed(oid,text,oid)
|
||||
function citus_internal.pg_dist_node_trigger_func()
|
||||
function citus_internal.pg_dist_rebalance_strategy_trigger_func()
|
||||
function citus_internal.pg_dist_shard_placement_trigger_func()
|
||||
function citus_internal.refresh_isolation_tester_prepared_statement()
|
||||
function citus_internal.replace_isolation_tester_func()
|
||||
function citus_internal.restore_isolation_tester_func()
|
||||
function citus_internal.start_management_transaction(xid8)
|
||||
function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid)
|
||||
function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean)
|
||||
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
|
||||
|
@ -344,5 +348,5 @@ ORDER BY 1;
|
|||
view citus_stat_tenants_local
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(334 rows)
|
||||
(338 rows)
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ test: failure_multi_row_insert
|
|||
test: failure_mx_metadata_sync
|
||||
test: failure_mx_metadata_sync_multi_trans
|
||||
test: failure_connection_establishment
|
||||
test: failure_non_main_db_2pc
|
||||
|
||||
# this test syncs metadata to the workers
|
||||
test: failure_failover_to_local_execution
|
||||
|
|
|
@ -108,6 +108,7 @@ test: object_propagation_debug
|
|||
test: undistribute_table
|
||||
test: run_command_on_all_nodes
|
||||
test: background_task_queue_monitor
|
||||
test: other_databases
|
||||
|
||||
# Causal clock test
|
||||
test: clock
|
||||
|
|
|
@ -490,6 +490,8 @@ push(@pgOptions, "citus.stat_statements_track = 'all'");
|
|||
push(@pgOptions, "citus.enable_change_data_capture=on");
|
||||
push(@pgOptions, "citus.stat_tenants_limit = 2");
|
||||
push(@pgOptions, "citus.stat_tenants_track = 'ALL'");
|
||||
push(@pgOptions, "citus.main_db = 'regression'");
|
||||
push(@pgOptions, "citus.superuser = 'postgres'");
|
||||
|
||||
# Some tests look at shards in pg_class, make sure we can usually see them:
|
||||
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");
|
||||
|
|
|
@ -1,20 +1,12 @@
|
|||
set citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%ALTER DATABASE%';
|
||||
|
||||
|
||||
-- since ALLOW_CONNECTIONS alter option should be executed in a different database
|
||||
-- and since we don't have a multiple database support for now,
|
||||
-- this statement will get error
|
||||
alter database regression ALLOW_CONNECTIONS false;
|
||||
|
||||
|
||||
alter database regression with CONNECTION LIMIT 100;
|
||||
alter database regression with IS_TEMPLATE true CONNECTION LIMIT 50;
|
||||
alter database regression with CONNECTION LIMIT -1;
|
||||
alter database regression with IS_TEMPLATE true;
|
||||
alter database regression with IS_TEMPLATE false;
|
||||
-- this statement will get error since we don't have a multiple database support for now
|
||||
alter database regression rename to regression2;
|
||||
|
||||
|
||||
alter database regression set default_transaction_read_only = true;
|
||||
|
||||
|
@ -56,4 +48,66 @@ alter database regression set lock_timeout from current;
|
|||
alter database regression set lock_timeout to DEFAULT;
|
||||
alter database regression RESET lock_timeout;
|
||||
|
||||
set citus.enable_create_database_propagation=on;
|
||||
create database "regression!'2";
|
||||
alter database "regression!'2" with CONNECTION LIMIT 100;
|
||||
alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50;
|
||||
alter database "regression!'2" with IS_TEMPLATE false;
|
||||
|
||||
|
||||
|
||||
|
||||
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts3'
|
||||
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
|
||||
|
||||
\c - - - :worker_1_port
|
||||
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts4'
|
||||
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
|
||||
|
||||
\c - - - :worker_2_port
|
||||
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts5'
|
||||
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
set citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%ALTER DATABASE%';
|
||||
|
||||
alter database "regression!'2" set TABLESPACE alter_db_tablespace;
|
||||
|
||||
set citus.enable_create_database_propagation=on;
|
||||
alter database "regression!'2" rename to regression3;
|
||||
|
||||
-- check that the local database rename and alter comnmand is not propagated
|
||||
set citus.enable_create_database_propagation=off;
|
||||
CREATE database local_regression;
|
||||
|
||||
alter DATABASE local_regression with CONNECTION LIMIT 100;
|
||||
alter DATABASE local_regression rename to local_regression2;
|
||||
drop database local_regression2;
|
||||
|
||||
set citus.enable_create_database_propagation=on;
|
||||
|
||||
drop database regression3;
|
||||
|
||||
create database "regression!'4";
|
||||
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes(
|
||||
$$
|
||||
ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape"
|
||||
$$
|
||||
);
|
||||
|
||||
alter database "regression!'4" set TABLESPACE "ts-needs\!escape";
|
||||
|
||||
drop database "regression!'4";
|
||||
|
||||
set citus.log_remote_commands = false;
|
||||
set citus.enable_create_database_propagation=off;
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes(
|
||||
$$
|
||||
drop tablespace "ts-needs\!escape"
|
||||
$$
|
||||
);
|
||||
|
|
|
@ -129,13 +129,8 @@ CREATE USER "role-needs\!escape";
|
|||
CREATE DATABASE "db-needs\!escape" owner "role-needs\!escape" tablespace "ts-needs\!escape";
|
||||
|
||||
-- Rename it to make check_database_on_all_nodes happy.
|
||||
-- Today we don't support ALTER DATABASE .. RENAME TO .., so need to propagate it manually.
|
||||
SELECT result FROM run_command_on_all_nodes(
|
||||
$$
|
||||
ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape
|
||||
$$
|
||||
);
|
||||
|
||||
ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape;
|
||||
SELECT * FROM public.check_database_on_all_nodes('db_needs_escape') ORDER BY node_type;
|
||||
|
||||
-- test database syncing after node addition
|
||||
|
@ -541,6 +536,7 @@ REVOKE CONNECT ON DATABASE test_db FROM propagated_role;
|
|||
DROP DATABASE test_db;
|
||||
DROP ROLE propagated_role, non_propagated_role;
|
||||
|
||||
|
||||
--clean up resources created by this test
|
||||
|
||||
-- DROP TABLESPACE is not supported, so we need to drop it manually.
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
SELECT citus.mitmproxy('conn.allow()');
|
||||
|
||||
CREATE SCHEMA failure_non_main_db_2pc;
|
||||
SET SEARCH_PATH TO 'failure_non_main_db_2pc';
|
||||
|
||||
CREATE DATABASE other_db1;
|
||||
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
|
||||
|
||||
\c other_db1
|
||||
|
||||
CREATE USER user_1;
|
||||
|
||||
\c regression
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
|
||||
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
|
||||
|
||||
SELECT recover_prepared_transactions();
|
||||
|
||||
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
|
||||
|
||||
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()');
|
||||
|
||||
\c other_db1
|
||||
|
||||
CREATE USER user_2;
|
||||
|
||||
\c regression
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
|
||||
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
|
||||
|
||||
SELECT recover_prepared_transactions();
|
||||
|
||||
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
|
||||
|
||||
DROP DATABASE other_db1;
|
||||
-- user_2 should not exist because the query to create it will fail
|
||||
-- but let's make sure we try to drop it just in case
|
||||
DROP USER IF EXISTS user_1, user_2;
|
||||
|
||||
SELECT citus_set_coordinator_host('localhost');
|
||||
|
||||
\c - - - :worker_1_port
|
||||
|
||||
CREATE DATABASE other_db2;
|
||||
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
|
||||
|
||||
\c other_db2
|
||||
|
||||
CREATE USER user_3;
|
||||
|
||||
\c regression
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
|
||||
|
||||
SELECT recover_prepared_transactions();
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
|
||||
|
||||
DROP DATABASE other_db2;
|
||||
DROP USER user_3;
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$);
|
||||
|
||||
DROP SCHEMA failure_non_main_db_2pc;
|
|
@ -47,7 +47,7 @@ INSERT INTO pg_dist_transaction VALUES (122, 'citus_122_should_do_nothing');
|
|||
SELECT recover_prepared_transactions();
|
||||
|
||||
-- delete the citus_122_should_do_nothing transaction
|
||||
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *;
|
||||
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid;
|
||||
ROLLBACK PREPARED 'citus_122_should_do_nothing';
|
||||
|
||||
SELECT count(*) FROM pg_dist_transaction;
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
CREATE SCHEMA other_databases;
|
||||
SET search_path TO other_databases;
|
||||
|
||||
SET citus.next_shard_id TO 10231023;
|
||||
|
||||
CREATE DATABASE other_db1;
|
||||
|
||||
\c other_db1
|
||||
SHOW citus.main_db;
|
||||
|
||||
-- check that empty citus.superuser gives error
|
||||
SET citus.superuser TO '';
|
||||
CREATE USER empty_superuser;
|
||||
SET citus.superuser TO 'postgres';
|
||||
|
||||
CREATE USER other_db_user1;
|
||||
CREATE USER other_db_user2;
|
||||
|
||||
BEGIN;
|
||||
CREATE USER other_db_user3;
|
||||
CREATE USER other_db_user4;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
CREATE USER other_db_user5;
|
||||
CREATE USER other_db_user6;
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
CREATE USER other_db_user7;
|
||||
SELECT 1/0;
|
||||
COMMIT;
|
||||
|
||||
CREATE USER other_db_user8;
|
||||
|
||||
\c regression
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
|
||||
|
||||
\c - - - :master_port
|
||||
-- some user creation commands will fail but let's make sure we try to drop them just in case
|
||||
DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8;
|
||||
|
||||
-- Make sure non-superuser roles cannot use internal GUCs
|
||||
-- but they can still create a role
|
||||
CREATE USER nonsuperuser CREATEROLE;
|
||||
GRANT ALL ON SCHEMA citus_internal TO nonsuperuser;
|
||||
SET ROLE nonsuperuser;
|
||||
SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres');
|
||||
|
||||
\c other_db1
|
||||
CREATE USER other_db_user9;
|
||||
|
||||
RESET ROLE;
|
||||
\c regression
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
|
||||
|
||||
\c - - - :master_port
|
||||
REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser;
|
||||
DROP USER other_db_user9, nonsuperuser;
|
||||
|
||||
-- test from a worker
|
||||
\c - - - :worker_1_port
|
||||
|
||||
CREATE DATABASE other_db2;
|
||||
|
||||
\c other_db2
|
||||
|
||||
CREATE USER worker_user1;
|
||||
|
||||
BEGIN;
|
||||
CREATE USER worker_user2;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
CREATE USER worker_user3;
|
||||
ROLLBACK;
|
||||
|
||||
\c regression
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
|
||||
|
||||
\c - - - :master_port
|
||||
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
|
||||
|
||||
-- some user creation commands will fail but let's make sure we try to drop them just in case
|
||||
DROP USER IF EXISTS worker_user1, worker_user2, worker_user3;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
DROP DATABASE other_db2;
|
||||
\c - - - :master_port
|
||||
|
||||
DROP SCHEMA other_databases;
|
||||
DROP DATABASE other_db1;
|
Loading…
Reference in New Issue