Merge branch 'main' into fix_timeout

pull/7377/head
LightDB Enterprise Postgres 2023-12-29 17:13:02 +08:00 committed by GitHub
commit 20c3e057be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
59 changed files with 2075 additions and 172 deletions

View File

@ -192,6 +192,25 @@ PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
}
/*
* IsSetTablespaceStatement returns true if the statement is a SET TABLESPACE statement,
* false otherwise.
*/
static bool
IsSetTablespaceStatement(AlterDatabaseStmt *stmt)
{
DefElem *def = NULL;
foreach_ptr(def, stmt->options)
{
if (strcmp(def->defname, "tablespace") == 0)
{
return true;
}
}
return false;
}
/*
* PreprocessAlterDatabaseStmt is executed before the statement is applied to the local
* postgres instance.
@ -203,22 +222,38 @@ List *
PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagate())
bool missingOk = false;
AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node);
ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname,
missingOk);
if (!ShouldPropagate() || !IsAnyObjectDistributed(list_make1(dbAddress)))
{
return NIL;
}
AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node);
EnsureCoordinator();
char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
if (IsSetTablespaceStatement(stmt))
{
/*
* Set tablespace does not work inside a transaction.Therefore, we need to use
* NontransactionalNodeDDLTask to run the command on the workers outside
* the transaction block.
*/
return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
else
{
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
}
@ -256,6 +291,36 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
#endif
/*
* PreprocessAlterDatabaseRenameStmt is executed before the statement is applied to the local
* postgres instance. In this stage we prepare ALTER DATABASE RENAME statement to be run on
* all workers.
*/
List *
PostprocessAlterDatabaseRenameStmt(Node *node, const char *queryString)
{
bool missingOk = false;
RenameStmt *stmt = castNode(RenameStmt, node);
ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->newname,
missingOk);
if (!ShouldPropagate() || !IsAnyObjectDistributed(list_make1(dbAddress)))
{
return NIL;
}
EnsureCoordinator();
char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local
* postgres instance.

View File

@ -31,20 +31,90 @@
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
typedef enum RequiredObjectSet
{
REQUIRE_ONLY_DEPENDENCIES = 1,
REQUIRE_OBJECT_AND_DEPENDENCIES = 2,
} RequiredObjectSet;
static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress);
static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress);
static int ObjectAddressComparator(const void *a, const void *b);
static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
static void EnsureRequiredObjectSetExistOnAllNodes(const ObjectAddress *target,
RequiredObjectSet requiredObjectSet);
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
static bool ShouldPropagateObject(const ObjectAddress *address);
static char * DropTableIfExistsCommand(Oid relationId);
/*
* EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes
* sure these are available on all nodes. If not available they will be created on the
* nodes via a separate session that will be committed directly so that the objects are
* visible to potentially multiple sessions creating the shards.
* EnsureObjectAndDependenciesExistOnAllNodes is a wrapper around
* EnsureRequiredObjectSetExistOnAllNodes to ensure the "object itself" (together
* with its dependencies) is available on all nodes.
*
* Different than EnsureDependenciesExistOnAllNodes, we return early if the
* target object is distributed already.
*
* The reason why we don't do the same in EnsureDependenciesExistOnAllNodes
* is that it's is used when altering an object too and hence the target object
* may instantly have a dependency that needs to be propagated now. For example,
* when "GRANT non_dist_role TO dist_role" is executed, we need to propagate
* "non_dist_role" to all nodes before propagating the "GRANT" command itself.
* For this reason, we call EnsureDependenciesExistOnAllNodes for "dist_role"
* and it would automatically discover that "non_dist_role" is a dependency of
* "dist_role" and propagate it beforehand.
*
* However, when we're requested to create the target object itself (and
* implicitly its dependencies), we're sure that we're not altering the target
* object itself, hence we can return early if the target object is already
* distributed. This is the case, for example, when
* "REASSIGN OWNED BY dist_role TO non_dist_role" is executed. In that case,
* "non_dist_role" is not a dependency of "dist_role" but we want to distribute
* "non_dist_role" beforehand and we call this function for "non_dist_role",
* not for "dist_role".
*
* See EnsureRequiredObjectExistOnAllNodes to learn more about how this
* function deals with an object created within the same transaction.
*/
void
EnsureObjectAndDependenciesExistOnAllNodes(const ObjectAddress *target)
{
if (IsAnyObjectDistributed(list_make1((ObjectAddress *) target)))
{
return;
}
EnsureRequiredObjectSetExistOnAllNodes(target, REQUIRE_OBJECT_AND_DEPENDENCIES);
}
/*
* EnsureDependenciesExistOnAllNodes is a wrapper around
* EnsureRequiredObjectSetExistOnAllNodes to ensure "all dependencies" of given
* object --but not the object itself-- are available on all nodes.
*
* See EnsureRequiredObjectSetExistOnAllNodes to learn more about how this
* function deals with an object created within the same transaction.
*/
static void
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
{
EnsureRequiredObjectSetExistOnAllNodes(target, REQUIRE_ONLY_DEPENDENCIES);
}
/*
* EnsureRequiredObjectSetExistOnAllNodes finds all the dependencies that we support and makes
* sure these are available on all nodes if required object set is REQUIRE_ONLY_DEPENDENCIES.
* Otherwise, i.e., if required object set is REQUIRE_OBJECT_AND_DEPENDENCIES, then this
* function creates the object itself on all nodes too. This function ensures that each
* of the dependencies are supported by Citus but doesn't check the same for the target
* object itself (when REQUIRE_OBJECT_AND_DEPENDENCIES) is provided because we assume that
* callers don't call this function for an unsupported function at all.
*
* If not available, they will be created on the nodes via a separate session that will be
* committed directly so that the objects are visible to potentially multiple sessions creating
* the shards.
*
* Note; only the actual objects are created via a separate session, the records to
* pg_dist_object are created in this session. As a side effect the objects could be
@ -55,29 +125,52 @@ static char * DropTableIfExistsCommand(Oid relationId);
* postgres native CREATE IF NOT EXISTS, or citus helper functions.
*/
static void
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
EnsureRequiredObjectSetExistOnAllNodes(const ObjectAddress *target,
RequiredObjectSet requiredObjectSet)
{
List *dependenciesWithCommands = NIL;
Assert(requiredObjectSet == REQUIRE_ONLY_DEPENDENCIES ||
requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES);
List *objectsWithCommands = NIL;
List *ddlCommands = NULL;
/*
* If there is any unsupported dependency or circular dependency exists, Citus can
* not ensure dependencies will exist on all nodes.
*
* Note that we don't check whether "target" is distributable (in case
* REQUIRE_OBJECT_AND_DEPENDENCIES is provided) because we expect callers
* to not even call this function if Citus doesn't know how to propagate
* "target" object itself.
*/
EnsureDependenciesCanBeDistributed(target);
/* collect all dependencies in creation order and get their ddl commands */
List *dependencies = GetDependenciesForObject(target);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies)
List *objectsToBeCreated = GetDependenciesForObject(target);
/*
* Append the target object to make sure that it's created after its
* dependencies are created, if requested.
*/
if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES)
{
List *dependencyCommands = GetDependencyCreateDDLCommands(dependency);
ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress));
*targetCopy = *target;
objectsToBeCreated = lappend(objectsToBeCreated, targetCopy);
}
ObjectAddress *object = NULL;
foreach_ptr(object, objectsToBeCreated)
{
List *dependencyCommands = GetDependencyCreateDDLCommands(object);
ddlCommands = list_concat(ddlCommands, dependencyCommands);
/* create a new list with dependencies that actually created commands */
/* create a new list with objects that actually created commands */
if (list_length(dependencyCommands) > 0)
{
dependenciesWithCommands = lappend(dependenciesWithCommands, dependency);
objectsWithCommands = lappend(objectsWithCommands, object);
}
}
if (list_length(ddlCommands) <= 0)
@ -100,26 +193,28 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
List *remoteNodeList = ActivePrimaryRemoteNodeList(RowShareLock);
/*
* Lock dependent objects explicitly to make sure same DDL command won't be sent
* Lock objects to be created explicitly to make sure same DDL command won't be sent
* multiple times from parallel sessions.
*
* Sort dependencies that will be created on workers to not to have any deadlock
* Sort the objects that will be created on workers to not to have any deadlock
* issue if different sessions are creating different objects.
*/
List *addressSortedDependencies = SortList(dependenciesWithCommands,
List *addressSortedDependencies = SortList(objectsWithCommands,
ObjectAddressComparator);
foreach_ptr(dependency, addressSortedDependencies)
foreach_ptr(object, addressSortedDependencies)
{
LockDatabaseObject(dependency->classId, dependency->objectId,
dependency->objectSubId, ExclusiveLock);
LockDatabaseObject(object->classId, object->objectId,
object->objectSubId, ExclusiveLock);
}
/*
* We need to propagate dependencies via the current user's metadata connection if
* any dependency for the target is created in the current transaction. Our assumption
* is that if we rely on a dependency created in the current transaction, then the
* current user, most probably, has permissions to create the target object as well.
* We need to propagate objects via the current user's metadata connection if
* any of the objects that we're interested in are created in the current transaction.
* Our assumption is that if we rely on an object created in the current transaction,
* then the current user, most probably, has permissions to create the target object
* as well.
*
* Note that, user still may not be able to create the target due to no permissions
* for any of its dependencies. But this is ok since it should be rare.
*
@ -127,7 +222,18 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* have visibility issues since propagated dependencies would be invisible to
* the separate connection until we locally commit.
*/
if (HasAnyDependencyInPropagatedObjects(target))
List *createdObjectList = GetAllSupportedDependenciesForObject(target);
/* consider target as well if we're requested to create it too */
if (requiredObjectSet == REQUIRE_OBJECT_AND_DEPENDENCIES)
{
ObjectAddress *targetCopy = palloc(sizeof(ObjectAddress));
*targetCopy = *target;
createdObjectList = lappend(createdObjectList, targetCopy);
}
if (HasAnyObjectInPropagatedObjects(createdObjectList))
{
SendCommandListToRemoteNodesWithMetadata(ddlCommands);
}
@ -150,7 +256,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* that objects have been created on remote nodes before marking them
* distributed, so MarkObjectDistributed wouldn't fail.
*/
foreach_ptr(dependency, dependenciesWithCommands)
foreach_ptr(object, objectsWithCommands)
{
/*
* pg_dist_object entries must be propagated with the super user, since
@ -160,7 +266,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* Only dependent object's metadata should be propagated with super user.
* Metadata of the table itself must be propagated with the current user.
*/
MarkObjectDistributedViaSuperUser(dependency);
MarkObjectDistributedViaSuperUser(object);
}
}

View File

@ -275,6 +275,17 @@ static DistributeObjectOps Any_CreateRole = {
.address = CreateRoleStmtObjectAddress,
.markDistributed = true,
};
static DistributeObjectOps Any_ReassignOwned = {
.deparse = DeparseReassignOwnedStmt,
.qualify = NULL,
.preprocess = NULL,
.postprocess = PostprocessReassignOwnedStmt,
.operationType = DIST_OPS_ALTER,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Any_DropOwned = {
.deparse = DeparseDropOwnedStmt,
.qualify = NULL,
@ -522,6 +533,16 @@ static DistributeObjectOps Database_Set = {
.markDistributed = false,
};
static DistributeObjectOps Database_Rename = {
.deparse = DeparseAlterDatabaseRenameStmt,
.qualify = NULL,
.preprocess = NULL,
.postprocess = PostprocessAlterDatabaseRenameStmt,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_ALTER,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Domain_Alter = {
.deparse = DeparseAlterDomainStmt,
@ -1868,6 +1889,11 @@ GetDistributeObjectOps(Node *node)
return &Any_DropOwned;
}
case T_ReassignOwnedStmt:
{
return &Any_ReassignOwned;
}
case T_DropStmt:
{
DropStmt *stmt = castNode(DropStmt, node);
@ -2087,6 +2113,11 @@ GetDistributeObjectOps(Node *node)
return &Collation_Rename;
}
case OBJECT_DATABASE:
{
return &Database_Rename;
}
case OBJECT_DOMAIN:
{
return &Domain_Rename;

View File

@ -885,6 +885,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
char *workerPgDistObjectUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
NIL,
distArgumentIndexList,
colocationIdList,
forceDelegationList);

View File

@ -48,6 +48,9 @@
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"
static ObjectAddress * GetNewRoleAddress(ReassignOwnedStmt *stmt);
/*
* PreprocessDropOwnedStmt finds the distributed role out of the ones
* being dropped and unmarks them distributed and creates the drop statements
@ -89,3 +92,81 @@ PreprocessDropOwnedStmt(Node *node, const char *queryString,
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessReassignOwnedStmt takes a Node pointer representing a REASSIGN
* OWNED statement and performs any necessary post-processing after the statement
* has been executed locally.
*
* We filter out local roles in OWNED BY clause before deparsing the command,
* meaning that we skip reassigning what is owned by local roles. However,
* if the role specified in TO clause is local, we automatically distribute
* it before deparsing the command.
*/
List *
PostprocessReassignOwnedStmt(Node *node, const char *queryString)
{
ReassignOwnedStmt *stmt = castNode(ReassignOwnedStmt, node);
List *allReassignRoles = stmt->roles;
List *distributedReassignRoles = FilterDistributedRoles(allReassignRoles);
if (list_length(distributedReassignRoles) <= 0)
{
return NIL;
}
if (!ShouldPropagate())
{
return NIL;
}
EnsureCoordinator();
stmt->roles = distributedReassignRoles;
char *sql = DeparseTreeNode((Node *) stmt);
stmt->roles = allReassignRoles;
ObjectAddress *newRoleAddress = GetNewRoleAddress(stmt);
/*
* We temporarily enable create / alter role propagation to properly
* propagate the role specified in TO clause.
*/
int saveNestLevel = NewGUCNestLevel();
set_config_option("citus.enable_create_role_propagation", "on",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
set_config_option("citus.enable_alter_role_propagation", "on",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
set_config_option("citus.enable_alter_role_set_propagation", "on",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
EnsureObjectAndDependenciesExistOnAllNodes(newRoleAddress);
/* rollback GUCs to the state before this session */
AtEOXact_GUC(true, saveNestLevel);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* GetNewRoleAddress returns the ObjectAddress of the new role
*/
static ObjectAddress *
GetNewRoleAddress(ReassignOwnedStmt *stmt)
{
Oid roleOid = get_role_oid(stmt->newrole->rolename, false);
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*address, AuthIdRelationId, roleOid);
return address;
}

View File

@ -34,6 +34,7 @@
#include "access/htup_details.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
@ -44,6 +45,7 @@
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "postmaster/postmaster.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@ -77,6 +79,7 @@
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/string_utils.h"
#include "distributed/transaction_management.h"
@ -84,6 +87,13 @@
#include "distributed/worker_shard_visibility.h"
#include "distributed/worker_transaction.h"
#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \
"SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)"
#define START_MANAGEMENT_TRANSACTION \
"SELECT citus_internal.start_management_transaction('%lu')"
#define MARK_OBJECT_DISTRIBUTED \
"SELECT citus_internal.mark_object_distributed(%d, %s, %d)"
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE;
@ -112,6 +122,8 @@ static void PostStandardProcessUtility(Node *parsetree);
static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
static bool IsDropSchemaOrDB(Node *parsetree);
static bool ShouldCheckUndistributeCitusLocalTables(void);
static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString);
static void RunPostprocessMainDBCommand(Node *parsetree);
/*
* ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of
@ -243,6 +255,11 @@ citus_ProcessUtility(PlannedStmt *pstmt,
if (!CitusHasBeenLoaded())
{
if (!IsMainDB)
{
RunPreprocessMainDBCommand(parsetree, queryString);
}
/*
* Ensure that utility commands do not behave any differently until CREATE
* EXTENSION is invoked.
@ -250,6 +267,11 @@ citus_ProcessUtility(PlannedStmt *pstmt,
PrevProcessUtility(pstmt, queryString, false, context,
params, queryEnv, dest, completionTag);
if (!IsMainDB)
{
RunPostprocessMainDBCommand(parsetree);
}
return;
}
else if (IsA(parsetree, CallStmt))
@ -1572,3 +1594,49 @@ DropSchemaOrDBInProgress(void)
{
return activeDropSchemaOrDBs > 0;
}
/*
* RunPreprocessMainDBCommand runs the necessary commands for a query, in main
* database before query is run on the local node with PrevProcessUtility
*/
static void
RunPreprocessMainDBCommand(Node *parsetree, const char *queryString)
{
if (IsA(parsetree, CreateRoleStmt))
{
StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
START_MANAGEMENT_TRANSACTION,
GetCurrentFullTransactionId().value);
RunCitusMainDBQuery(mainDBQuery->data);
mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER,
quote_literal_cstr(queryString),
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
}
}
/*
* RunPostprocessMainDBCommand runs the necessary commands for a query, in main
* database after query is run on the local node with PrevProcessUtility
*/
static void
RunPostprocessMainDBCommand(Node *parsetree)
{
if (IsA(parsetree, CreateRoleStmt))
{
StringInfo mainDBQuery = makeStringInfo();
CreateRoleStmt *createRoleStmt = castNode(CreateRoleStmt, parsetree);
Oid roleOid = get_role_oid(createRoleStmt->role, false);
appendStringInfo(mainDBQuery,
MARK_OBJECT_DISTRIBUTED,
AuthIdRelationId,
quote_literal_cstr(createRoleStmt->role),
roleOid);
RunCitusMainDBQuery(mainDBQuery->data);
}
}

View File

@ -425,11 +425,13 @@ GetConnParam(const char *keyword)
/*
* GetEffectiveConnKey checks whether there is any pooler configuration for the
* provided key (host/port combination). The one case where this logic is not
* applied is for loopback connections originating within the task tracker. If
* a corresponding row is found in the poolinfo table, a modified (effective)
* key is returned with the node, port, and dbname overridden, as applicable,
* otherwise, the original key is returned unmodified.
* provided key (host/port combination). If a corresponding row is found in the
* poolinfo table, a modified (effective) key is returned with the node, port,
* and dbname overridden, as applicable, otherwise, the original key is returned
* unmodified.
*
* In the case of Citus non-main databases we just return the key, since we
* would not have access to tables with worker information.
*/
ConnectionHashKey *
GetEffectiveConnKey(ConnectionHashKey *key)
@ -444,7 +446,17 @@ GetEffectiveConnKey(ConnectionHashKey *key)
return key;
}
if (!CitusHasBeenLoaded())
{
/*
* This happens when we connect to main database over localhost
* from some non Citus database.
*/
return key;
}
WorkerNode *worker = FindWorkerNode(key->hostname, key->port);
if (worker == NULL)
{
/* this can be hit when the key references an unknown node */

View File

@ -30,12 +30,14 @@
static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
static void AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt);
static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt);
static void AppendDefElemConnLimit(StringInfo buf, DefElem *def);
static void AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt);
static void AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt);
static void AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt);
static void AppendBasicAlterDatabaseOptions(StringInfo buf, AlterDatabaseStmt *stmt);
static void AppendGrantDatabases(StringInfo buf, GrantStmt *stmt);
static void AppendAlterDatabaseSetTablespace(StringInfo buf, DefElem *def, char *dbname);
const DefElemOptionFormat create_database_option_formats[] = {
const DefElemOptionFormat createDatabaseOptionFormats[] = {
{ "owner", " OWNER %s", OPTION_FORMAT_STRING },
{ "template", " TEMPLATE %s", OPTION_FORMAT_STRING },
{ "encoding", " ENCODING %s", OPTION_FORMAT_LITERAL_CSTR },
@ -53,6 +55,14 @@ const DefElemOptionFormat create_database_option_formats[] = {
{ "is_template", " IS_TEMPLATE %s", OPTION_FORMAT_BOOLEAN }
};
const DefElemOptionFormat alterDatabaseOptionFormats[] = {
{ "is_template", " IS_TEMPLATE %s", OPTION_FORMAT_BOOLEAN },
{ "allow_connections", " ALLOW_CONNECTIONS %s", OPTION_FORMAT_BOOLEAN },
{ "connection_limit", " CONNECTION LIMIT %d", OPTION_FORMAT_INTEGER },
};
char *
DeparseAlterDatabaseOwnerStmt(Node *node)
{
@ -112,48 +122,63 @@ AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt)
static void
AppendDefElemConnLimit(StringInfo buf, DefElem *def)
AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
{
appendStringInfo(buf, " CONNECTION LIMIT %ld", (long int) defGetNumeric(def));
if (list_length(stmt->options) == 0)
{
elog(ERROR, "got unexpected number of options for ALTER DATABASE");
}
if (stmt->options)
{
DefElem *firstOption = linitial(stmt->options);
if (strcmp(firstOption->defname, "tablespace") == 0)
{
AppendAlterDatabaseSetTablespace(buf, firstOption, stmt->dbname);
/* SET tablespace cannot be combined with other options */
return;
}
appendStringInfo(buf, "ALTER DATABASE %s WITH",
quote_identifier(stmt->dbname));
AppendBasicAlterDatabaseOptions(buf, stmt);
}
appendStringInfo(buf, ";");
}
static void
AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
AppendAlterDatabaseSetTablespace(StringInfo buf, DefElem *def, char *dbname)
{
appendStringInfo(buf, "ALTER DATABASE %s ", quote_identifier(stmt->dbname));
appendStringInfo(buf,
"ALTER DATABASE %s SET TABLESPACE %s",
quote_identifier(dbname), quote_identifier(defGetString(def)));
}
if (stmt->options)
/*
* AppendBasicAlterDatabaseOptions appends basic ALTER DATABASE options to a string buffer.
* Basic options are those that can be appended to the ALTER DATABASE statement
* after the "WITH" keyword.(i.e. ALLOW_CONNECTIONS, CONNECTION LIMIT, IS_TEMPLATE)
* For example, the tablespace option is not a basic option since it is defined via SET keyword.
*
* This function takes a string buffer and an AlterDatabaseStmt as input.
* It appends the basic options to the string buffer.
*
*/
static void
AppendBasicAlterDatabaseOptions(StringInfo buf, AlterDatabaseStmt *stmt)
{
DefElem *def = NULL;
foreach_ptr(def, stmt->options)
{
ListCell *cell = NULL;
appendStringInfo(buf, "WITH ");
foreach(cell, stmt->options)
{
DefElem *def = castNode(DefElem, lfirst(cell));
if (strcmp(def->defname, "is_template") == 0)
{
appendStringInfo(buf, "IS_TEMPLATE %s",
quote_literal_cstr(strVal(def->arg)));
}
else if (strcmp(def->defname, "connection_limit") == 0)
{
AppendDefElemConnLimit(buf, def);
}
else if (strcmp(def->defname, "allow_connections") == 0)
{
ereport(ERROR,
errmsg("ALLOW_CONNECTIONS is not supported"));
}
else
{
ereport(ERROR,
errmsg("unrecognized ALTER DATABASE option: %s",
def->defname));
}
}
DefElemOptionToStatement(buf, def, alterDatabaseOptionFormats, lengthof(
alterDatabaseOptionFormats));
}
appendStringInfo(buf, ";");
}
@ -216,6 +241,22 @@ AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt)
}
char *
DeparseAlterDatabaseRenameStmt(Node *node)
{
RenameStmt *stmt = (RenameStmt *) node;
StringInfoData str;
initStringInfo(&str);
appendStringInfo(&str, "ALTER DATABASE %s RENAME TO %s",
quote_identifier(stmt->subname),
quote_identifier(stmt->newname));
return str.data;
}
char *
DeparseAlterDatabaseSetStmt(Node *node)
{
@ -246,8 +287,8 @@ AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt)
DefElem *option = NULL;
foreach_ptr(option, stmt->options)
{
DefElemOptionToStatement(buf, option, create_database_option_formats,
lengthof(create_database_option_formats));
DefElemOptionToStatement(buf, option, createDatabaseOptionFormats,
lengthof(createDatabaseOptionFormats));
}
}

View File

@ -71,7 +71,7 @@ AppendRoleList(StringInfo buf, List *roleList)
{
Node *roleNode = (Node *) lfirst(cell);
Assert(IsA(roleNode, RoleSpec) || IsA(roleNode, AccessPriv));
char const *rolename = NULL;
const char *rolename = NULL;
if (IsA(roleNode, RoleSpec))
{
rolename = RoleSpecString((RoleSpec *) roleNode, true);
@ -83,3 +83,27 @@ AppendRoleList(StringInfo buf, List *roleList)
}
}
}
static void
AppendReassignOwnedStmt(StringInfo buf, ReassignOwnedStmt *stmt)
{
appendStringInfo(buf, "REASSIGN OWNED BY ");
AppendRoleList(buf, stmt->roles);
const char *newRoleName = RoleSpecString(stmt->newrole, true);
appendStringInfo(buf, " TO %s", newRoleName);
}
char *
DeparseReassignOwnedStmt(Node *node)
{
ReassignOwnedStmt *stmt = castNode(ReassignOwnedStmt, node);
StringInfoData buf = { 0 };
initStringInfo(&buf);
AppendReassignOwnedStmt(&buf, stmt);
return buf.data;
}

View File

@ -49,18 +49,42 @@
#include "distributed/metadata/pg_dist_object.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/remote_commands.h"
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress,
char *objectName);
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
Datum *paramValues);
static bool IsObjectDistributed(const ObjectAddress *address);
PG_FUNCTION_INFO_V1(mark_object_distributed);
PG_FUNCTION_INFO_V1(citus_unmark_object_distributed);
PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
/*
* mark_object_distributed adds an object to pg_dist_object
* in all of the nodes.
*/
Datum
mark_object_distributed(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
Oid classId = PG_GETARG_OID(0);
text *objectNameText = PG_GETARG_TEXT_P(1);
char *objectName = text_to_cstring(objectNameText);
Oid objectId = PG_GETARG_OID(2);
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*objectAddress, classId, objectId);
MarkObjectDistributedWithName(objectAddress, objectName);
PG_RETURN_VOID();
}
/*
* citus_unmark_object_distributed(classid oid, objid oid, objsubid int)
*
@ -160,12 +184,28 @@ ObjectExists(const ObjectAddress *address)
void
MarkObjectDistributed(const ObjectAddress *distAddress)
{
MarkObjectDistributedWithName(distAddress, "");
}
/*
* MarkObjectDistributedWithName marks an object as a distributed object.
* Same as MarkObjectDistributed but this function also allows passing an objectName
* that is used in case the object does not exists for the current transaction.
*/
void
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName)
{
if (!CitusHasBeenLoaded())
{
elog(ERROR, "Cannot mark object distributed because Citus has not been loaded.");
}
MarkObjectDistributedLocally(distAddress);
if (EnableMetadataSync)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
CreatePgDistObjectEntryCommand(distAddress, objectName);
SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand);
}
}
@ -188,7 +228,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
if (EnableMetadataSync)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
CreatePgDistObjectEntryCommand(distAddress, "");
SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
}
}
@ -279,17 +319,21 @@ ShouldMarkRelationDistributed(Oid relationId)
* for the given object address.
*/
static char *
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress)
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, char *objectName)
{
/* create a list by adding the address of value to not to have warning */
List *objectAddressList =
list_make1((ObjectAddress *) objectAddress);
/* names also require a list so we create a nested list here */
List *objectNameList = list_make1(list_make1((char *) objectName));
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);
char *workerPgDistObjectUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
objectNameList,
distArgumetIndexList,
colocationIdList,
forceDelegationList);

View File

@ -79,6 +79,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_placement.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/remote_commands.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shared_library_init.h"
#include "distributed/utils/array_type.h"
@ -5722,6 +5723,14 @@ GetPoolinfoViaCatalog(int32 nodeId)
char *
GetAuthinfoViaCatalog(const char *roleName, int64 nodeId)
{
/*
* Citus will not be loaded when we run a global DDL command from a
* Citus non-main database.
*/
if (!CitusHasBeenLoaded())
{
return "";
}
char *authinfo = "";
Datum nodeIdDatumArray[2] = {
Int32GetDatum(nodeId),

View File

@ -83,6 +83,7 @@
#include "distributed/pg_dist_shard.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/resource_lock.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/utils/array_type.h"
@ -900,6 +901,7 @@ NodeListIdempotentInsertCommand(List *workerNodeList)
*/
char *
MarkObjectsDistributedCreateCommand(List *addresses,
List *namesArg,
List *distributionArgumentIndexes,
List *colocationIds,
List *forceDelegations)
@ -924,9 +926,25 @@ MarkObjectsDistributedCreateCommand(List *addresses,
int forceDelegation = list_nth_int(forceDelegations, currentObjectCounter);
List *names = NIL;
List *args = NIL;
char *objectType = NULL;
char *objectType = getObjectTypeDescription(address, false);
getObjectIdentityParts(address, &names, &args, false);
if (IsMainDBCommand)
{
/*
* When we try to distribute an object that's being created in a non Citus
* main database, we cannot find the name, since the object is not visible
* in Citus main database.
* Because of that we need to pass the name to this function.
*/
names = list_nth(namesArg, currentObjectCounter);
bool missingOk = false;
objectType = getObjectTypeDescription(address, missingOk);
}
else
{
objectType = getObjectTypeDescription(address, false);
getObjectIdentityParts(address, &names, &args, IsMainDBCommand);
}
if (!isFirstObject)
{
@ -5148,6 +5166,7 @@ SendDistObjectCommands(MetadataSyncContext *context)
char *workerMetadataUpdateCommand =
MarkObjectsDistributedCreateCommand(list_make1(address),
NIL,
list_make1_int(distributionArgumentIndex),
list_make1_int(colocationId),
list_make1_int(forceDelegation));

View File

@ -94,6 +94,7 @@
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/repartition_executor.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/resource_lock.h"
@ -2570,6 +2571,17 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NoticeIfSubqueryPushdownEnabled, NULL, NULL);
DefineCustomStringVariable(
"citus.superuser",
gettext_noop("Name of a superuser role to be used in Citus main database "
"connections"),
NULL,
&SuperuserRole,
"",
PGC_SUSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.task_assignment_policy",
gettext_noop("Sets the policy to use when assigning tasks to worker nodes."),
@ -3149,6 +3161,8 @@ CitusAuthHook(Port *port, int status)
*/
InitializeBackendData(port->application_name);
IsMainDB = (strncmp(MainDb, "", NAMEDATALEN) == 0 ||
strncmp(MainDb, port->database_name, NAMEDATALEN) == 0);
/* let other authentication hooks to kick in first */
if (original_client_auth_hook)

View File

@ -3,3 +3,10 @@
#include "udfs/citus_internal_database_command/12.2-1.sql"
#include "udfs/citus_add_rebalance_strategy/12.2-1.sql"
#include "udfs/start_management_transaction/12.2-1.sql"
#include "udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql"
#include "udfs/mark_object_distributed/12.2-1.sql"
#include "udfs/commit_management_command_2pc/12.2-1.sql"
ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8;

View File

@ -3,3 +3,20 @@
DROP FUNCTION pg_catalog.citus_internal_database_command(text);
#include "../udfs/citus_add_rebalance_strategy/10.1-1.sql"
DROP FUNCTION citus_internal.start_management_transaction(
outer_xid xid8
);
DROP FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(
query text,
username text
);
DROP FUNCTION citus_internal.mark_object_distributed(
classId Oid, objectName text, objectId Oid
);
DROP FUNCTION citus_internal.commit_management_command_2pc();
ALTER TABLE pg_catalog.pg_dist_transaction DROP COLUMN outer_xid;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc()
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$;
COMMENT ON FUNCTION citus_internal.commit_management_command_2pc()
IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.commit_management_command_2pc()
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$;
COMMENT ON FUNCTION citus_internal.commit_management_command_2pc()
IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$;
COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
IS 'executes a query on the nodes other than the current one';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$execute_command_on_remote_nodes_as_user$$;
COMMENT ON FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(query text, username text)
IS 'executes a query on the nodes other than the current one';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$mark_object_distributed$$;
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
IS 'adds an object to pg_dist_object on all nodes';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$mark_object_distributed$$;
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid)
IS 'adds an object to pg_dist_object on all nodes';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$start_management_transaction$$;
COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
IS 'internal Citus function that starts a management transaction in the main database';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$start_management_transaction$$;
COMMENT ON FUNCTION citus_internal.start_management_transaction(outer_xid xid8)
IS 'internal Citus function that starts a management transaction in the main database';

View File

@ -19,14 +19,19 @@
#include "miscadmin.h"
#include "access/xact.h"
#include "postmaster/postmaster.h"
#include "utils/builtins.h"
#include "utils/hsearch.h"
#include "utils/xid8.h"
#include "distributed/backend_data.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
@ -56,6 +61,9 @@ static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection
static void Assign2PCIdentifier(MultiConnection *connection);
PG_FUNCTION_INFO_V1(start_management_transaction);
PG_FUNCTION_INFO_V1(execute_command_on_remote_nodes_as_user);
PG_FUNCTION_INFO_V1(commit_management_command_2pc);
static char *IsolationLevelName[] = {
"READ UNCOMMITTED",
@ -64,6 +72,154 @@ static char *IsolationLevelName[] = {
"SERIALIZABLE"
};
/*
* These variables are necessary for running queries from a database that is not
* the Citus main database. Some of these queries need to be propagated to the
* workers and Citus main database will be used for these queries, such as
* CREATE ROLE. For that we create a connection to the Citus main database and
* run queries from there.
*/
/* The MultiConnection used for connecting Citus main database. */
MultiConnection *MainDBConnection = NULL;
/*
* IsMainDBCommand is true if this is a query in the Citus main database that is started
* by a query from a different database.
*/
bool IsMainDBCommand = false;
/*
* The transaction id of the query from the other database that started the
* main database query.
*/
FullTransactionId OuterXid;
/*
* Shows if this is the Citus main database or not. We needed a variable instead of
* checking if this database's name is the same as MainDb because we sometimes need
* this value outside a transaction where we cannot reach the current database name.
*/
bool IsMainDB = true;
/*
* Name of a superuser role to be used during main database connections.
*/
char *SuperuserRole = NULL;
/*
* start_management_transaction starts a management transaction
* in the main database by recording the outer transaction's transaction id and setting
* IsMainDBCommand to true.
*/
Datum
start_management_transaction(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
OuterXid = PG_GETARG_FULLTRANSACTIONID(0);
IsMainDBCommand = true;
Use2PCForCoordinatedTransaction();
PG_RETURN_VOID();
}
/*
* execute_command_on_remote_nodes_as_user executes the query on the nodes
* other than the current node, using the user passed.
*/
Datum
execute_command_on_remote_nodes_as_user(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
text *queryText = PG_GETARG_TEXT_P(0);
char *query = text_to_cstring(queryText);
text *usernameText = PG_GETARG_TEXT_P(1);
char *username = text_to_cstring(usernameText);
StringInfo queryToSend = makeStringInfo();
appendStringInfo(queryToSend, "%s;%s;%s", DISABLE_METADATA_SYNC, query,
ENABLE_METADATA_SYNC);
SendCommandToWorkersAsUser(REMOTE_NODES, username, queryToSend->data);
PG_RETURN_VOID();
}
/*
* commit_management_command_2pc is a wrapper UDF for
* CoordinatedRemoteTransactionsCommit
*/
Datum
commit_management_command_2pc(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
RecoverTwoPhaseCommits();
PG_RETURN_VOID();
}
/*
* RunCitusMainDBQuery creates a connection to Citus main database if necessary
* and runs the query over the connection in the main database.
*/
void
RunCitusMainDBQuery(char *query)
{
if (MainDBConnection == NULL)
{
if (strlen(SuperuserRole) == 0)
{
ereport(ERROR, (errmsg("No superuser role is given for Citus main "
"database connection"),
errhint("Set citus.superuser to a superuser role name")));
}
int flags = 0;
MainDBConnection = GetNodeUserDatabaseConnection(flags, LocalHostName,
PostPortNumber,
SuperuserRole,
MainDb);
RemoteTransactionBegin(MainDBConnection);
}
SendRemoteCommand(MainDBConnection, query);
PGresult *result = GetRemoteCommandResult(MainDBConnection, true);
if (!IsResponseOK(result))
{
ReportResultError(MainDBConnection, result, ERROR);
}
ForgetResults(MainDBConnection);
}
/*
* CleanCitusMainDBConnection closes and removes the connection to Citus main database.
*/
void
CleanCitusMainDBConnection(void)
{
if (MainDBConnection == NULL)
{
return;
}
CloseConnection(MainDBConnection);
MainDBConnection = NULL;
}
/*
* StartRemoteTransactionBegin initiates beginning the remote transaction in
@ -616,7 +772,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection)
WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port);
if (workerNode != NULL)
{
LogTransactionRecord(workerNode->groupId, transaction->preparedName);
LogTransactionRecord(workerNode->groupId, transaction->preparedName, OuterXid);
}
/*

View File

@ -21,6 +21,7 @@
#include "catalog/dependency.h"
#include "common/hashfn.h"
#include "nodes/print.h"
#include "postmaster/postmaster.h"
#include "storage/fd.h"
#include "utils/datum.h"
#include "utils/guc.h"
@ -46,6 +47,7 @@
#include "distributed/multi_logical_replication.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/repartition_join_execution.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/shard_cleaner.h"
@ -55,6 +57,9 @@
#include "distributed/version_compat.h"
#include "distributed/worker_log_messages.h"
#define COMMIT_MANAGEMENT_COMMAND_2PC \
"SELECT citus_internal.commit_management_command_2pc()"
CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
@ -317,12 +322,23 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
MemoryContext previousContext =
MemoryContextSwitchTo(CitusXactCallbackContext);
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED &&
!IsMainDBCommand)
{
/* handles both already prepared and open transactions */
CoordinatedRemoteTransactionsCommit();
}
/*
* If this is a non-Citus main database we should try to commit the prepared
* transactions created by the Citus main database on the worker nodes.
*/
if (!IsMainDB && MainDBConnection != NULL)
{
RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC);
CleanCitusMainDBConnection();
}
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
@ -378,6 +394,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
RemoveIntermediateResultsDirectories();
CleanCitusMainDBConnection();
/* handles both already prepared and open transactions */
if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
{
@ -509,6 +527,17 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
break;
}
/*
* If this is a non-Citus main database we should commit the Citus
* main database query. So if some error happens on the distributed main
* database query we wouldn't have committed the current query.
*/
if (!IsMainDB && MainDBConnection != NULL)
{
RunCitusMainDBQuery("COMMIT");
}
/*
* TODO: It'd probably be a good idea to force constraints and
* such to 'immediate' here. Deferred triggers might try to send
@ -537,7 +566,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* us to mark failed placements as invalid. Better don't use
* this for anything important (i.e. DDL/metadata).
*/
CoordinatedRemoteTransactionsCommit();
if (IsMainDB)
{
CoordinatedRemoteTransactionsCommit();
}
CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED;
}
@ -1139,18 +1171,17 @@ ResetPropagatedObjects(void)
/*
* HasAnyDependencyInPropagatedObjects decides if any dependency of given object is
* HasAnyObjectInPropagatedObjects decides if any of the objects in given list are
* propagated in the current transaction.
*/
bool
HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress)
HasAnyObjectInPropagatedObjects(List *objectList)
{
List *dependencyList = GetAllSupportedDependenciesForObject(objectAddress);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencyList)
ObjectAddress *object = NULL;
foreach_ptr(object, objectList)
{
/* first search in root transaction */
if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, dependency))
if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, object))
{
return true;
}
@ -1163,7 +1194,7 @@ HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress)
SubXactContext *state = NULL;
foreach_ptr(state, activeSubXactContexts)
{
if (DependencyInPropagatedObjectsHash(state->propagatedObjects, dependency))
if (DependencyInPropagatedObjectsHash(state->propagatedObjects, object))
{
return true;
}

View File

@ -29,10 +29,12 @@
#include "lib/stringinfo.h"
#include "storage/lmgr.h"
#include "storage/lock.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/xid8.h"
#include "pg_version_constants.h"
@ -82,7 +84,7 @@ recover_prepared_transactions(PG_FUNCTION_ARGS)
* prepared transaction should be committed.
*/
void
LogTransactionRecord(int32 groupId, char *transactionName)
LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId outerXid)
{
Datum values[Natts_pg_dist_transaction];
bool isNulls[Natts_pg_dist_transaction];
@ -93,6 +95,7 @@ LogTransactionRecord(int32 groupId, char *transactionName)
values[Anum_pg_dist_transaction_groupid - 1] = Int32GetDatum(groupId);
values[Anum_pg_dist_transaction_gid - 1] = CStringGetTextDatum(transactionName);
values[Anum_pg_dist_transaction_outerxid - 1] = FullTransactionIdGetDatum(outerXid);
/* open transaction relation and insert new tuple */
Relation pgDistTransaction = table_open(DistTransactionRelationId(),
@ -258,6 +261,54 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
continue;
}
/* Check if the transaction is created by an outer transaction from a non-main database */
bool outerXidIsNull = false;
Datum outerXidDatum = heap_getattr(heapTuple,
Anum_pg_dist_transaction_outerxid,
tupleDescriptor, &outerXidIsNull);
TransactionId outerXid = 0;
if (!outerXidIsNull)
{
FullTransactionId outerFullXid = DatumGetFullTransactionId(outerXidDatum);
outerXid = XidFromFullTransactionId(outerFullXid);
}
if (outerXid != 0)
{
bool outerXactIsInProgress = TransactionIdIsInProgress(outerXid);
bool outerXactDidCommit = TransactionIdDidCommit(outerXid);
if (outerXactIsInProgress && !outerXactDidCommit)
{
/*
* The transaction is initiated from an outer transaction and the outer
* transaction is not yet committed, so we should not commit either.
* We remove this transaction from the pendingTransactionSet so it'll
* not be aborted by the loop below.
*/
hash_search(pendingTransactionSet, transactionName, HASH_REMOVE,
&foundPreparedTransactionBeforeCommit);
continue;
}
else if (!outerXactIsInProgress && !outerXactDidCommit)
{
/*
* Since outer transaction isn't in progress and did not commit we need to
* abort the prepared transaction too. We do this by simply doing the same
* thing we would do for transactions that are initiated from the main
* database.
*/
continue;
}
else
{
/*
* Outer transaction did commit, so we can try to commit the prepared
* transaction too.
*/
}
}
/*
* Remove the transaction from the pending list such that only transactions
* that need to be aborted remain at the end.

View File

@ -234,7 +234,8 @@ List *
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{
List *workerNodeList = NIL;
if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES)
if (targetWorkerSet == ALL_SHARD_NODES ||
targetWorkerSet == METADATA_NODES)
{
workerNodeList = ActivePrimaryNodeList(lockMode);
}

View File

@ -244,6 +244,8 @@ extern List * DropDatabaseStmtObjectAddress(Node *node, bool missingOk,
bool isPostprocess);
extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missingOk,
bool isPostprocess);
extern List * GenerateGrantDatabaseCommandList(void);
extern List * PostprocessAlterDatabaseRenameStmt(Node *node, const char *queryString);
extern void EnsureSupportedCreateDatabaseCommand(CreatedbStmt *stmt);
extern char * CreateDatabaseDDLCommand(Oid dbId);
@ -440,6 +442,7 @@ extern List * CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok, bool
/* owned.c - forward declarations */
extern List * PreprocessDropOwnedStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessReassignOwnedStmt(Node *node, const char *queryString);
/* policy.c - forward declarations */
extern List * CreatePolicyCommands(Oid relationId);

View File

@ -231,6 +231,7 @@ extern void QualifyAlterRoleSetStmt(Node *stmt);
extern char * DeparseCreateRoleStmt(Node *stmt);
extern char * DeparseDropRoleStmt(Node *stmt);
extern char * DeparseGrantRoleStmt(Node *stmt);
extern char * DeparseReassignOwnedStmt(Node *node);
/* forward declarations for deparse_owned_stmts.c */
extern char * DeparseDropOwnedStmt(Node *node);
@ -251,6 +252,7 @@ extern char * DeparseAlterDatabaseRefreshCollStmt(Node *node);
extern char * DeparseAlterDatabaseSetStmt(Node *node);
extern char * DeparseCreateDatabaseStmt(Node *node);
extern char * DeparseDropDatabaseStmt(Node *node);
extern char * DeparseAlterDatabaseRenameStmt(Node *node);
/* forward declaration for deparse_publication_stmts.c */

View File

@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
extern bool IsAnyObjectDistributed(const List *addresses);
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name);
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address);

View File

@ -89,6 +89,7 @@ extern List * NodeMetadataCreateCommands(void);
extern List * CitusTableMetadataCreateCommandList(Oid relationId);
extern List * NodeMetadataDropCommands(void);
extern char * MarkObjectsDistributedCreateCommand(List *addresses,
List *names,
List *distributionArgumentIndexes,
List *colocationIds,
List *forceDelegations);

View File

@ -386,6 +386,7 @@ extern void EnsureUndistributeTenantTableSafe(Oid relationId, const char *operat
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
extern void UndistributeTables(List *relationIdList);
extern void EnsureObjectAndDependenciesExistOnAllNodes(const ObjectAddress *target);
extern void EnsureAllObjectDependenciesExistOnAllNodes(const List *targets);
extern DeferredErrorMessage * DeferErrorIfCircularDependencyExists(const
ObjectAddress *

View File

@ -35,9 +35,10 @@ typedef FormData_pg_dist_transaction *Form_pg_dist_transaction;
* compiler constants for pg_dist_transaction
* ----------------
*/
#define Natts_pg_dist_transaction 2
#define Natts_pg_dist_transaction 3
#define Anum_pg_dist_transaction_groupid 1
#define Anum_pg_dist_transaction_gid 2
#define Anum_pg_dist_transaction_outerxid 3
#endif /* PG_DIST_TRANSACTION_H */

View File

@ -144,4 +144,13 @@ extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId);
extern void CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId);
extern void CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId);
extern void RunCitusMainDBQuery(char *query);
extern void CleanCitusMainDBConnection(void);
extern bool IsMainDBCommand;
extern bool IsMainDB;
extern char *SuperuserRole;
extern char *MainDb;
extern struct MultiConnection *MainDBConnection;
#endif /* REMOTE_TRANSACTION_H */

View File

@ -163,7 +163,7 @@ extern bool MaybeExecutingUDF(void);
extern void TrackPropagatedObject(const ObjectAddress *objectAddress);
extern void TrackPropagatedTableAndSequences(Oid relationId);
extern void ResetPropagatedObjects(void);
extern bool HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress);
extern bool HasAnyObjectInPropagatedObjects(List *objectList);
/* initialization function(s) */
extern void InitializeTransactionManagement(void);

View File

@ -17,7 +17,8 @@ extern int Recover2PCInterval;
/* Functions declarations for worker transactions */
extern void LogTransactionRecord(int32 groupId, char *transactionName);
extern void LogTransactionRecord(int32 groupId, char *transactionName,
FullTransactionId outerXid);
extern int RecoverTwoPhaseCommits(void);
extern void DeleteWorkerTransactions(WorkerNode *workerNode);

View File

@ -0,0 +1,154 @@
def test_main_commited_outer_not_yet(cluster):
c = cluster.coordinator
w0 = cluster.workers[0]
# create a non-main database
c.sql("CREATE DATABASE db1")
# we will use cur1 to simulate non-main database user and
# cur2 to manually do the steps we would do in the main database
with c.cur(dbname="db1") as cur1, c.cur() as cur2:
# let's start a transaction and find its transaction id
cur1.execute("BEGIN")
cur1.execute("SELECT txid_current()")
txid = cur1.fetchall()
# using the transaction id of the cur1 simulate the main database commands manually
cur2.execute("BEGIN")
cur2.execute(
"SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),)
)
cur2.execute(
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')"
)
cur2.execute(
"SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)"
)
cur2.execute("COMMIT")
# run the transaction recovery
c.sql("SELECT recover_prepared_transactions()")
# user should not be created on the worker because outer transaction is not committed yet
role_before_commit = w0.sql_value(
"SELECT count(*) FROM pg_roles WHERE rolname = 'u1'"
)
assert (
int(role_before_commit) == 0
), "role is on pg_dist_object despite not committing"
# user should not be in pg_dist_object on the worker because outer transaction is not committed yet
pdo_before_commit = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
)
assert int(pdo_before_commit) == 0, "role is created despite not committing"
# commit in cur1 so the transaction recovery thinks this is a successful transaction
cur1.execute("COMMIT")
# run the transaction recovery again after committing
c.sql("SELECT recover_prepared_transactions()")
# check that the user is created by the transaction recovery on the worker
role_after_commit = w0.sql_value(
"SELECT count(*) FROM pg_roles WHERE rolname = 'u1'"
)
assert (
int(role_after_commit) == 1
), "role is not created during recovery despite committing"
# check that the user is on pg_dist_object on the worker after transaction recovery
pdo_after_commit = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
)
assert (
int(pdo_after_commit) == 1
), "role is not on pg_dist_object after recovery despite committing"
c.sql("DROP DATABASE db1")
c.sql(
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('DROP USER u1', 'postgres')"
)
c.sql(
"""
SELECT run_command_on_workers($$
DELETE FROM pg_dist_object
WHERE objid::regrole::text = 'u1'
$$)
"""
)
def test_main_commited_outer_aborted(cluster):
c = cluster.coordinator
w0 = cluster.workers[0]
# create a non-main database
c.sql("CREATE DATABASE db2")
# we will use cur1 to simulate non-main database user and
# cur2 to manually do the steps we would do in the main database
with c.cur(dbname="db2") as cur1, c.cur() as cur2:
# let's start a transaction and find its transaction id
cur1.execute("BEGIN")
cur1.execute("SELECT txid_current()")
txid = cur1.fetchall()
# using the transaction id of the cur1 simulate the main database commands manually
cur2.execute("BEGIN")
cur2.execute(
"SELECT citus_internal.start_management_transaction(%s)", (str(txid[0][0]),)
)
cur2.execute(
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u2;', 'postgres')"
)
cur2.execute(
"SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321)"
)
cur2.execute("COMMIT")
# abort cur1 so the transaction recovery thinks this is an aborted transaction
cur1.execute("ABORT")
# check that the user is not yet created on the worker
role_before_recovery = w0.sql_value(
"SELECT count(*) FROM pg_roles WHERE rolname = 'u2'"
)
assert int(role_before_recovery) == 0, "role is already created before recovery"
# check that the user is not on pg_dist_object on the worker
pdo_before_recovery = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
)
assert (
int(pdo_before_recovery) == 0
), "role is already on pg_dist_object before recovery"
# run the transaction recovery
c.sql("SELECT recover_prepared_transactions()")
# check that the user is not created by the transaction recovery on the worker
role_after_recovery = w0.sql_value(
"SELECT count(*) FROM pg_roles WHERE rolname = 'u2'"
)
assert (
int(role_after_recovery) == 0
), "role is created during recovery despite aborting"
# check that the user is not on pg_dist_object on the worker after transaction recovery
pdo_after_recovery = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
)
assert (
int(pdo_after_recovery) == 0
), "role is on pg_dist_object after recovery despite aborting"
c.sql("DROP DATABASE db2")

View File

@ -1,38 +1,30 @@
set citus.log_remote_commands = true;
set citus.grep_remote_commands = '%ALTER DATABASE%';
-- since ALLOW_CONNECTIONS alter option should be executed in a different database
-- and since we don't have a multiple database support for now,
-- this statement will get error
alter database regression ALLOW_CONNECTIONS false;
ERROR: ALLOW_CONNECTIONS is not supported
alter database regression with CONNECTION LIMIT 100;
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100;
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100;
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT 100;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
alter database regression with IS_TEMPLATE true CONNECTION LIMIT 50;
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true' CONNECTION LIMIT 50;
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true CONNECTION LIMIT 50;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true' CONNECTION LIMIT 50;
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true CONNECTION LIMIT 50;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
alter database regression with CONNECTION LIMIT -1;
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1;
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1;
NOTICE: issuing ALTER DATABASE regression WITH CONNECTION LIMIT -1;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
alter database regression with IS_TEMPLATE true;
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true';
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'true';
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE true;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
alter database regression with IS_TEMPLATE false;
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'false';
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE false;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE 'false';
NOTICE: issuing ALTER DATABASE regression WITH IS_TEMPLATE false;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-- this statement will get error since we don't have a multiple database support for now
alter database regression rename to regression2;
ERROR: current database cannot be renamed
alter database regression set default_transaction_read_only = true;
NOTICE: issuing ALTER DATABASE regression SET default_transaction_read_only = 'true'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -147,4 +139,86 @@ NOTICE: issuing ALTER DATABASE regression RESET lock_timeout
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE regression RESET lock_timeout
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.enable_create_database_propagation=on;
create database "regression!'2";
alter database "regression!'2" with CONNECTION LIMIT 100;
NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50;
NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE true CONNECTION LIMIT 50;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE true CONNECTION LIMIT 50;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
alter database "regression!'2" with IS_TEMPLATE false;
NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE false;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE "regression!'2" WITH IS_TEMPLATE false;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts3'
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
\c - - - :worker_1_port
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts4'
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
\c - - - :worker_2_port
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts5'
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
\c - - - :master_port
set citus.log_remote_commands = true;
set citus.grep_remote_commands = '%ALTER DATABASE%';
alter database "regression!'2" set TABLESPACE alter_db_tablespace;
NOTICE: issuing ALTER DATABASE "regression!'2" SET TABLESPACE alter_db_tablespace
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE "regression!'2" SET TABLESPACE alter_db_tablespace
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.enable_create_database_propagation=on;
alter database "regression!'2" rename to regression3;
NOTICE: issuing ALTER DATABASE "regression!'2" RENAME TO regression3
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE "regression!'2" RENAME TO regression3
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-- check that the local database rename and alter comnmand is not propagated
set citus.enable_create_database_propagation=off;
CREATE database local_regression;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
alter DATABASE local_regression with CONNECTION LIMIT 100;
alter DATABASE local_regression rename to local_regression2;
drop database local_regression2;
set citus.enable_create_database_propagation=on;
drop database regression3;
create database "regression!'4";
SELECT result FROM run_command_on_all_nodes(
$$
ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape"
$$
);
result
---------------------------------------------------------------------
ALTER TABLESPACE
ALTER TABLESPACE
ALTER TABLESPACE
(3 rows)
alter database "regression!'4" set TABLESPACE "ts-needs\!escape";
NOTICE: issuing ALTER DATABASE "regression!'4" SET TABLESPACE "ts-needs\!escape"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE "regression!'4" SET TABLESPACE "ts-needs\!escape"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
drop database "regression!'4";
set citus.log_remote_commands = false;
set citus.enable_create_database_propagation=off;
SELECT result FROM run_command_on_all_nodes(
$$
drop tablespace "ts-needs\!escape"
$$
);
result
---------------------------------------------------------------------
DROP TABLESPACE
DROP TABLESPACE
DROP TABLESPACE
(3 rows)

View File

@ -285,14 +285,7 @@ SELECT citus_schema_undistribute('tenant1');
ERROR: must be owner of schema tenant1
-- assign all tables to dummyregular except table5
SET role tenantuser;
SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO dummyregular; $$);
result
---------------------------------------------------------------------
REASSIGN OWNED
REASSIGN OWNED
REASSIGN OWNED
(3 rows)
REASSIGN OWNED BY tenantuser TO dummyregular;
CREATE TABLE tenant1.table5(id int);
-- table owner check fails the distribution
SET role dummyregular;
@ -366,14 +359,7 @@ SELECT result FROM run_command_on_all_nodes($$ SELECT array_agg(logicalrelid ORD
(3 rows)
RESET role;
SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY dummyregular TO tenantuser; $$);
result
---------------------------------------------------------------------
REASSIGN OWNED
REASSIGN OWNED
REASSIGN OWNED
(3 rows)
REASSIGN OWNED BY dummyregular TO tenantuser;
DROP USER dummyregular;
CREATE USER dummysuper superuser;
SET role dummysuper;

View File

@ -189,14 +189,7 @@ SELECT citus_schema_move('s2', 'dummy_node', 1234);
ERROR: must be owner of schema s2
-- assign all tables to regularuser
RESET ROLE;
SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO regularuser; $$);
result
---------------------------------------------------------------------
REASSIGN OWNED
REASSIGN OWNED
REASSIGN OWNED
(3 rows)
REASSIGN OWNED BY tenantuser TO regularuser;
GRANT USAGE ON SCHEMA citus_schema_move TO regularuser;
SET ROLE regularuser;
SELECT nodeid AS s2_new_nodeid, quote_literal(nodename) AS s2_new_nodename, nodeport AS s2_new_nodeport

View File

@ -209,19 +209,7 @@ SELECT result FROM run_command_on_all_nodes(
CREATE USER "role-needs\!escape";
CREATE DATABASE "db-needs\!escape" owner "role-needs\!escape" tablespace "ts-needs\!escape";
-- Rename it to make check_database_on_all_nodes happy.
-- Today we don't support ALTER DATABASE .. RENAME TO .., so need to propagate it manually.
SELECT result FROM run_command_on_all_nodes(
$$
ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape
$$
);
result
---------------------------------------------------------------------
ALTER DATABASE
ALTER DATABASE
ALTER DATABASE
(3 rows)
ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape;
SELECT * FROM public.check_database_on_all_nodes('db_needs_escape') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------

View File

@ -0,0 +1,154 @@
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CREATE SCHEMA failure_non_main_db_2pc;
SET SEARCH_PATH TO 'failure_non_main_db_2pc';
CREATE DATABASE other_db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c other_db1
CREATE USER user_1;
\c regression
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
nodeid | result
---------------------------------------------------------------------
0 | user_1
1 | user_1
2 |
(3 rows)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
1
(1 row)
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
nodeid | result
---------------------------------------------------------------------
0 | user_1
1 | user_1
2 | user_1
(3 rows)
SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c other_db1
CREATE USER user_2;
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
while executing command on localhost:xxxxx
\c regression
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
nodeid | result
---------------------------------------------------------------------
0 |
1 |
2 |
(3 rows)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
nodeid | result
---------------------------------------------------------------------
0 |
1 |
2 |
(3 rows)
DROP DATABASE other_db1;
-- user_2 should not exist because the query to create it will fail
-- but let's make sure we try to drop it just in case
DROP USER IF EXISTS user_1, user_2;
NOTICE: role "user_2" does not exist, skipping
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
CREATE DATABASE other_db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c other_db2
CREATE USER user_3;
\c regression
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
result
---------------------------------------------------------------------
user_3
user_3
(3 rows)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
1
(1 row)
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
result
---------------------------------------------------------------------
user_3
user_3
user_3
(3 rows)
DROP DATABASE other_db2;
DROP USER user_3;
\c - - - :master_port
SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$);
result
---------------------------------------------------------------------
DELETE 1
DELETE 1
DELETE 1
(3 rows)
DROP SCHEMA failure_non_main_db_2pc;

View File

@ -1420,10 +1420,14 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 12.2-1
ALTER EXTENSION citus UPDATE TO '12.2-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
| function citus_internal.commit_management_command_2pc() void
| function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void
| function citus_internal.mark_object_distributed(oid,text,oid) void
| function citus_internal.start_management_transaction(xid8) void
| function citus_internal_database_command(text) void
(1 row)
(5 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -64,7 +64,7 @@ SELECT recover_prepared_transactions();
(1 row)
-- delete the citus_122_should_do_nothing transaction
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *;
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid;
groupid | gid
---------------------------------------------------------------------
122 | citus_122_should_do_nothing

View File

@ -0,0 +1,130 @@
CREATE SCHEMA other_databases;
SET search_path TO other_databases;
SET citus.next_shard_id TO 10231023;
CREATE DATABASE other_db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c other_db1
SHOW citus.main_db;
citus.main_db
---------------------------------------------------------------------
regression
(1 row)
-- check that empty citus.superuser gives error
SET citus.superuser TO '';
CREATE USER empty_superuser;
ERROR: No superuser role is given for Citus main database connection
HINT: Set citus.superuser to a superuser role name
SET citus.superuser TO 'postgres';
CREATE USER other_db_user1;
CREATE USER other_db_user2;
BEGIN;
CREATE USER other_db_user3;
CREATE USER other_db_user4;
COMMIT;
BEGIN;
CREATE USER other_db_user5;
CREATE USER other_db_user6;
ROLLBACK;
BEGIN;
CREATE USER other_db_user7;
SELECT 1/0;
ERROR: division by zero
COMMIT;
CREATE USER other_db_user8;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
other_db_user1
other_db_user2
other_db_user3
other_db_user4
other_db_user8
(5 rows)
\c - - - :worker_1_port
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
other_db_user1
other_db_user2
other_db_user3
other_db_user4
other_db_user8
(5 rows)
\c - - - :master_port
-- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8;
NOTICE: role "other_db_user5" does not exist, skipping
NOTICE: role "other_db_user6" does not exist, skipping
NOTICE: role "other_db_user7" does not exist, skipping
-- Make sure non-superuser roles cannot use internal GUCs
-- but they can still create a role
CREATE USER nonsuperuser CREATEROLE;
GRANT ALL ON SCHEMA citus_internal TO nonsuperuser;
SET ROLE nonsuperuser;
SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres');
ERROR: operation is not allowed
HINT: Run the command with a superuser.
\c other_db1
CREATE USER other_db_user9;
RESET ROLE;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
other_db_user9
(1 row)
\c - - - :worker_1_port
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
other_db_user9
(1 row)
\c - - - :master_port
REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser;
DROP USER other_db_user9, nonsuperuser;
-- test from a worker
\c - - - :worker_1_port
CREATE DATABASE other_db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c other_db2
CREATE USER worker_user1;
BEGIN;
CREATE USER worker_user2;
COMMIT;
BEGIN;
CREATE USER worker_user3;
ROLLBACK;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
worker_user1
worker_user2
(2 rows)
\c - - - :master_port
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
usename
---------------------------------------------------------------------
worker_user1
worker_user2
(2 rows)
-- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS worker_user1, worker_user2, worker_user3;
NOTICE: role "worker_user3" does not exist, skipping
\c - - - :worker_1_port
DROP DATABASE other_db2;
\c - - - :master_port
DROP SCHEMA other_databases;
DROP DATABASE other_db1;

View File

@ -0,0 +1,194 @@
CREATE ROLE distributed_source_role1;
create ROLE "distributed_source_role-\!";
CREATE ROLE "distributed_target_role1-\!";
set citus.enable_create_role_propagation to off;
create ROLE local_target_role1;
NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to other nodes directly to manually create all necessary users and roles.
\c - - - :worker_1_port
set citus.enable_create_role_propagation to off;
CREATE ROLE local_target_role1;
NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to other nodes directly to manually create all necessary users and roles.
\c - - - :master_port
set citus.enable_create_role_propagation to off;
create role local_source_role1;
NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to other nodes directly to manually create all necessary users and roles.
reset citus.enable_create_role_propagation;
GRANT CREATE ON SCHEMA public TO distributed_source_role1,"distributed_source_role-\!";
SET ROLE distributed_source_role1;
CREATE TABLE public.test_table (col1 int);
set role "distributed_source_role-\!";
CREATE TABLE public.test_table2 (col2 int);
RESET ROLE;
select create_distributed_table('test_table', 'col1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
select create_distributed_table('test_table2', 'col2');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT result from run_command_on_all_nodes(
$$
SELECT jsonb_agg(to_jsonb(q2.*)) FROM (
SELECT
schemaname,
tablename,
tableowner
FROM
pg_tables
WHERE
tablename in ('test_table', 'test_table2')
ORDER BY tablename
) q2
$$
) ORDER BY result;
result
---------------------------------------------------------------------
[{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_source_role1"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_source_role-\\!"}]
[{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_source_role1"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_source_role-\\!"}]
[{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_source_role1"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_source_role-\\!"}]
(3 rows)
--tests for reassing owned by with multiple distributed roles and a local role to a distributed role
--local role should be ignored
set citus.log_remote_commands to on;
set citus.grep_remote_commands = '%REASSIGN OWNED BY%';
REASSIGN OWNED BY distributed_source_role1,"distributed_source_role-\!",local_source_role1 TO "distributed_target_role1-\!";
NOTICE: issuing REASSIGN OWNED BY distributed_source_role1, "distributed_source_role-\!" TO "distributed_target_role1-\!"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing REASSIGN OWNED BY distributed_source_role1, "distributed_source_role-\!" TO "distributed_target_role1-\!"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
reset citus.grep_remote_commands;
reset citus.log_remote_commands;
--check if the owner changed to "distributed_target_role1-\!"
RESET citus.log_remote_commands;
SELECT result from run_command_on_all_nodes(
$$
SELECT jsonb_agg(to_jsonb(q2.*)) FROM (
SELECT
schemaname,
tablename,
tableowner
FROM
pg_tables
WHERE
tablename in ('test_table', 'test_table2')
ORDER BY tablename
) q2
$$
) ORDER BY result;
result
---------------------------------------------------------------------
[{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}]
[{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}]
[{"tablename": "test_table", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}, {"tablename": "test_table2", "schemaname": "public", "tableowner": "distributed_target_role1-\\!"}]
(3 rows)
--tests for reassing owned by with multiple distributed roles and a local role to a local role
--local role should be ignored
SET ROLE distributed_source_role1;
CREATE TABLE public.test_table3 (col1 int);
set role "distributed_source_role-\!";
CREATE TABLE public.test_table4 (col2 int);
RESET ROLE;
select create_distributed_table('test_table3', 'col1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
select create_distributed_table('test_table4', 'col2');
create_distributed_table
---------------------------------------------------------------------
(1 row)
set citus.log_remote_commands to on;
set citus.grep_remote_commands = '%REASSIGN OWNED BY%';
set citus.enable_create_role_propagation to off;
set citus.enable_alter_role_propagation to off;
set citus.enable_alter_role_set_propagation to off;
REASSIGN OWNED BY distributed_source_role1,"distributed_source_role-\!",local_source_role1 TO local_target_role1;
NOTICE: issuing REASSIGN OWNED BY distributed_source_role1, "distributed_source_role-\!" TO local_target_role1
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing REASSIGN OWNED BY distributed_source_role1, "distributed_source_role-\!" TO local_target_role1
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
show citus.enable_create_role_propagation;
citus.enable_create_role_propagation
---------------------------------------------------------------------
off
(1 row)
show citus.enable_alter_role_propagation;
citus.enable_alter_role_propagation
---------------------------------------------------------------------
off
(1 row)
show citus.enable_alter_role_set_propagation;
citus.enable_alter_role_set_propagation
---------------------------------------------------------------------
off
(1 row)
reset citus.grep_remote_commands;
reset citus.log_remote_commands;
reset citus.enable_create_role_propagation;
reset citus.enable_alter_role_propagation;
reset citus.enable_alter_role_set_propagation;
--check if the owner changed to local_target_role1
SET citus.log_remote_commands = false;
SELECT result from run_command_on_all_nodes(
$$
SELECT jsonb_agg(to_jsonb(q2.*)) FROM (
SELECT
schemaname,
tablename,
tableowner
FROM
pg_tables
WHERE
tablename in ('test_table3', 'test_table4')
ORDER BY tablename
) q2
$$
) ORDER BY result;
result
---------------------------------------------------------------------
[{"tablename": "test_table3", "schemaname": "public", "tableowner": "local_target_role1"}, {"tablename": "test_table4", "schemaname": "public", "tableowner": "local_target_role1"}]
[{"tablename": "test_table3", "schemaname": "public", "tableowner": "local_target_role1"}, {"tablename": "test_table4", "schemaname": "public", "tableowner": "local_target_role1"}]
[{"tablename": "test_table3", "schemaname": "public", "tableowner": "local_target_role1"}, {"tablename": "test_table4", "schemaname": "public", "tableowner": "local_target_role1"}]
(3 rows)
--clear resources
DROP OWNED BY distributed_source_role1, "distributed_source_role-\!","distributed_target_role1-\!",local_target_role1;
SELECT result from run_command_on_all_nodes(
$$
SELECT jsonb_agg(to_jsonb(q2.*)) FROM (
SELECT
schemaname,
tablename,
tableowner
FROM
pg_tables
WHERE
tablename in ('test_table', 'test_table2', 'test_table3', 'test_table4')
) q2
$$
) ORDER BY result;
result
---------------------------------------------------------------------
(3 rows)
set client_min_messages to warning;
drop role distributed_source_role1, "distributed_source_role-\!","distributed_target_role1-\!",local_target_role1,local_source_role1;

View File

@ -56,13 +56,17 @@ ORDER BY 1;
function citus_get_active_worker_nodes()
function citus_get_node_clock()
function citus_get_transaction_clock()
function citus_internal.commit_management_command_2pc()
function citus_internal.execute_command_on_remote_nodes_as_user(text,text)
function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.mark_object_distributed(oid,text,oid)
function citus_internal.pg_dist_node_trigger_func()
function citus_internal.pg_dist_rebalance_strategy_trigger_func()
function citus_internal.pg_dist_shard_placement_trigger_func()
function citus_internal.refresh_isolation_tester_prepared_statement()
function citus_internal.replace_isolation_tester_func()
function citus_internal.restore_isolation_tester_func()
function citus_internal.start_management_transaction(xid8)
function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid)
function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean)
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
@ -344,5 +348,5 @@ ORDER BY 1;
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(334 rows)
(338 rows)

View File

@ -34,6 +34,7 @@ test: failure_multi_row_insert
test: failure_mx_metadata_sync
test: failure_mx_metadata_sync_multi_trans
test: failure_connection_establishment
test: failure_non_main_db_2pc
# this test syncs metadata to the workers
test: failure_failover_to_local_execution

View File

@ -59,6 +59,7 @@ test: grant_on_database_propagation
test: alter_database_propagation
test: citus_shards
test: reassign_owned
# ----------
# multi_citus_tools tests utility functions written for citus tools

View File

@ -108,6 +108,7 @@ test: object_propagation_debug
test: undistribute_table
test: run_command_on_all_nodes
test: background_task_queue_monitor
test: other_databases
# Causal clock test
test: clock

View File

@ -490,6 +490,8 @@ push(@pgOptions, "citus.stat_statements_track = 'all'");
push(@pgOptions, "citus.enable_change_data_capture=on");
push(@pgOptions, "citus.stat_tenants_limit = 2");
push(@pgOptions, "citus.stat_tenants_track = 'ALL'");
push(@pgOptions, "citus.main_db = 'regression'");
push(@pgOptions, "citus.superuser = 'postgres'");
# Some tests look at shards in pg_class, make sure we can usually see them:
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");

View File

@ -1,20 +1,12 @@
set citus.log_remote_commands = true;
set citus.grep_remote_commands = '%ALTER DATABASE%';
-- since ALLOW_CONNECTIONS alter option should be executed in a different database
-- and since we don't have a multiple database support for now,
-- this statement will get error
alter database regression ALLOW_CONNECTIONS false;
alter database regression with CONNECTION LIMIT 100;
alter database regression with IS_TEMPLATE true CONNECTION LIMIT 50;
alter database regression with CONNECTION LIMIT -1;
alter database regression with IS_TEMPLATE true;
alter database regression with IS_TEMPLATE false;
-- this statement will get error since we don't have a multiple database support for now
alter database regression rename to regression2;
alter database regression set default_transaction_read_only = true;
@ -56,4 +48,66 @@ alter database regression set lock_timeout from current;
alter database regression set lock_timeout to DEFAULT;
alter database regression RESET lock_timeout;
set citus.enable_create_database_propagation=on;
create database "regression!'2";
alter database "regression!'2" with CONNECTION LIMIT 100;
alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50;
alter database "regression!'2" with IS_TEMPLATE false;
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts3'
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
\c - - - :worker_1_port
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts4'
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
\c - - - :worker_2_port
\set alter_db_tablespace :abs_srcdir '/tmp_check/ts5'
CREATE TABLESPACE alter_db_tablespace LOCATION :'alter_db_tablespace';
\c - - - :master_port
set citus.log_remote_commands = true;
set citus.grep_remote_commands = '%ALTER DATABASE%';
alter database "regression!'2" set TABLESPACE alter_db_tablespace;
set citus.enable_create_database_propagation=on;
alter database "regression!'2" rename to regression3;
-- check that the local database rename and alter comnmand is not propagated
set citus.enable_create_database_propagation=off;
CREATE database local_regression;
alter DATABASE local_regression with CONNECTION LIMIT 100;
alter DATABASE local_regression rename to local_regression2;
drop database local_regression2;
set citus.enable_create_database_propagation=on;
drop database regression3;
create database "regression!'4";
SELECT result FROM run_command_on_all_nodes(
$$
ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape"
$$
);
alter database "regression!'4" set TABLESPACE "ts-needs\!escape";
drop database "regression!'4";
set citus.log_remote_commands = false;
set citus.enable_create_database_propagation=off;
SELECT result FROM run_command_on_all_nodes(
$$
drop tablespace "ts-needs\!escape"
$$
);

View File

@ -185,7 +185,7 @@ SELECT citus_schema_undistribute('tenant1');
-- assign all tables to dummyregular except table5
SET role tenantuser;
SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO dummyregular; $$);
REASSIGN OWNED BY tenantuser TO dummyregular;
CREATE TABLE tenant1.table5(id int);
-- table owner check fails the distribution
@ -219,7 +219,7 @@ SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*)=0 FROM pg_dist_co
SELECT result FROM run_command_on_all_nodes($$ SELECT array_agg(logicalrelid ORDER BY logicalrelid) FROM pg_dist_partition WHERE logicalrelid::text LIKE 'tenant1.%' AND colocationid > 0 $$);
RESET role;
SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY dummyregular TO tenantuser; $$);
REASSIGN OWNED BY dummyregular TO tenantuser;
DROP USER dummyregular;
CREATE USER dummysuper superuser;

View File

@ -147,7 +147,7 @@ SELECT citus_schema_move('s2', 'dummy_node', 1234);
-- assign all tables to regularuser
RESET ROLE;
SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY tenantuser TO regularuser; $$);
REASSIGN OWNED BY tenantuser TO regularuser;
GRANT USAGE ON SCHEMA citus_schema_move TO regularuser;

View File

@ -129,13 +129,8 @@ CREATE USER "role-needs\!escape";
CREATE DATABASE "db-needs\!escape" owner "role-needs\!escape" tablespace "ts-needs\!escape";
-- Rename it to make check_database_on_all_nodes happy.
-- Today we don't support ALTER DATABASE .. RENAME TO .., so need to propagate it manually.
SELECT result FROM run_command_on_all_nodes(
$$
ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape
$$
);
ALTER DATABASE "db-needs\!escape" RENAME TO db_needs_escape;
SELECT * FROM public.check_database_on_all_nodes('db_needs_escape') ORDER BY node_type;
-- test database syncing after node addition
@ -541,6 +536,7 @@ REVOKE CONNECT ON DATABASE test_db FROM propagated_role;
DROP DATABASE test_db;
DROP ROLE propagated_role, non_propagated_role;
--clean up resources created by this test
-- DROP TABLESPACE is not supported, so we need to drop it manually.

View File

@ -0,0 +1,75 @@
SELECT citus.mitmproxy('conn.allow()');
CREATE SCHEMA failure_non_main_db_2pc;
SET SEARCH_PATH TO 'failure_non_main_db_2pc';
CREATE DATABASE other_db1;
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
\c other_db1
CREATE USER user_1;
\c regression
SELECT citus.mitmproxy('conn.allow()');
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
SELECT recover_prepared_transactions();
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_1'$$) ORDER BY 1;
SELECT citus.mitmproxy('conn.onQuery(query="CREATE USER user_2").kill()');
\c other_db1
CREATE USER user_2;
\c regression
SELECT citus.mitmproxy('conn.allow()');
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
SELECT recover_prepared_transactions();
SELECT nodeid, result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_2'$$) ORDER BY 1;
DROP DATABASE other_db1;
-- user_2 should not exist because the query to create it will fail
-- but let's make sure we try to drop it just in case
DROP USER IF EXISTS user_1, user_2;
SELECT citus_set_coordinator_host('localhost');
\c - - - :worker_1_port
CREATE DATABASE other_db2;
SELECT citus.mitmproxy('conn.onQuery(query="COMMIT PREPARED").kill()');
\c other_db2
CREATE USER user_3;
\c regression
SELECT citus.mitmproxy('conn.allow()');
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
SELECT recover_prepared_transactions();
SELECT result FROM run_command_on_all_nodes($$SELECT rolname FROM pg_roles WHERE rolname::TEXT = 'user_3'$$) ORDER BY 1;
DROP DATABASE other_db2;
DROP USER user_3;
\c - - - :master_port
SELECT result FROM run_command_on_all_nodes($$DELETE FROM pg_dist_node WHERE groupid = 0$$);
DROP SCHEMA failure_non_main_db_2pc;

View File

@ -47,7 +47,7 @@ INSERT INTO pg_dist_transaction VALUES (122, 'citus_122_should_do_nothing');
SELECT recover_prepared_transactions();
-- delete the citus_122_should_do_nothing transaction
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *;
DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid;
ROLLBACK PREPARED 'citus_122_should_do_nothing';
SELECT count(*) FROM pg_dist_transaction;

View File

@ -0,0 +1,98 @@
CREATE SCHEMA other_databases;
SET search_path TO other_databases;
SET citus.next_shard_id TO 10231023;
CREATE DATABASE other_db1;
\c other_db1
SHOW citus.main_db;
-- check that empty citus.superuser gives error
SET citus.superuser TO '';
CREATE USER empty_superuser;
SET citus.superuser TO 'postgres';
CREATE USER other_db_user1;
CREATE USER other_db_user2;
BEGIN;
CREATE USER other_db_user3;
CREATE USER other_db_user4;
COMMIT;
BEGIN;
CREATE USER other_db_user5;
CREATE USER other_db_user6;
ROLLBACK;
BEGIN;
CREATE USER other_db_user7;
SELECT 1/0;
COMMIT;
CREATE USER other_db_user8;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
\c - - - :worker_1_port
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
\c - - - :master_port
-- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS other_db_user1, other_db_user2, other_db_user3, other_db_user4, other_db_user5, other_db_user6, other_db_user7, other_db_user8;
-- Make sure non-superuser roles cannot use internal GUCs
-- but they can still create a role
CREATE USER nonsuperuser CREATEROLE;
GRANT ALL ON SCHEMA citus_internal TO nonsuperuser;
SET ROLE nonsuperuser;
SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres');
\c other_db1
CREATE USER other_db_user9;
RESET ROLE;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
\c - - - :worker_1_port
SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1;
\c - - - :master_port
REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser;
DROP USER other_db_user9, nonsuperuser;
-- test from a worker
\c - - - :worker_1_port
CREATE DATABASE other_db2;
\c other_db2
CREATE USER worker_user1;
BEGIN;
CREATE USER worker_user2;
COMMIT;
BEGIN;
CREATE USER worker_user3;
ROLLBACK;
\c regression
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
\c - - - :master_port
SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
-- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS worker_user1, worker_user2, worker_user3;
\c - - - :worker_1_port
DROP DATABASE other_db2;
\c - - - :master_port
DROP SCHEMA other_databases;
DROP DATABASE other_db1;

View File

@ -0,0 +1,141 @@
CREATE ROLE distributed_source_role1;
create ROLE "distributed_source_role-\!";
CREATE ROLE "distributed_target_role1-\!";
set citus.enable_create_role_propagation to off;
create ROLE local_target_role1;
\c - - - :worker_1_port
set citus.enable_create_role_propagation to off;
CREATE ROLE local_target_role1;
\c - - - :master_port
set citus.enable_create_role_propagation to off;
create role local_source_role1;
reset citus.enable_create_role_propagation;
GRANT CREATE ON SCHEMA public TO distributed_source_role1,"distributed_source_role-\!";
SET ROLE distributed_source_role1;
CREATE TABLE public.test_table (col1 int);
set role "distributed_source_role-\!";
CREATE TABLE public.test_table2 (col2 int);
RESET ROLE;
select create_distributed_table('test_table', 'col1');
select create_distributed_table('test_table2', 'col2');
SELECT result from run_command_on_all_nodes(
$$
SELECT jsonb_agg(to_jsonb(q2.*)) FROM (
SELECT
schemaname,
tablename,
tableowner
FROM
pg_tables
WHERE
tablename in ('test_table', 'test_table2')
ORDER BY tablename
) q2
$$
) ORDER BY result;
--tests for reassing owned by with multiple distributed roles and a local role to a distributed role
--local role should be ignored
set citus.log_remote_commands to on;
set citus.grep_remote_commands = '%REASSIGN OWNED BY%';
REASSIGN OWNED BY distributed_source_role1,"distributed_source_role-\!",local_source_role1 TO "distributed_target_role1-\!";
reset citus.grep_remote_commands;
reset citus.log_remote_commands;
--check if the owner changed to "distributed_target_role1-\!"
RESET citus.log_remote_commands;
SELECT result from run_command_on_all_nodes(
$$
SELECT jsonb_agg(to_jsonb(q2.*)) FROM (
SELECT
schemaname,
tablename,
tableowner
FROM
pg_tables
WHERE
tablename in ('test_table', 'test_table2')
ORDER BY tablename
) q2
$$
) ORDER BY result;
--tests for reassing owned by with multiple distributed roles and a local role to a local role
--local role should be ignored
SET ROLE distributed_source_role1;
CREATE TABLE public.test_table3 (col1 int);
set role "distributed_source_role-\!";
CREATE TABLE public.test_table4 (col2 int);
RESET ROLE;
select create_distributed_table('test_table3', 'col1');
select create_distributed_table('test_table4', 'col2');
set citus.log_remote_commands to on;
set citus.grep_remote_commands = '%REASSIGN OWNED BY%';
set citus.enable_create_role_propagation to off;
set citus.enable_alter_role_propagation to off;
set citus.enable_alter_role_set_propagation to off;
REASSIGN OWNED BY distributed_source_role1,"distributed_source_role-\!",local_source_role1 TO local_target_role1;
show citus.enable_create_role_propagation;
show citus.enable_alter_role_propagation;
show citus.enable_alter_role_set_propagation;
reset citus.grep_remote_commands;
reset citus.log_remote_commands;
reset citus.enable_create_role_propagation;
reset citus.enable_alter_role_propagation;
reset citus.enable_alter_role_set_propagation;
--check if the owner changed to local_target_role1
SET citus.log_remote_commands = false;
SELECT result from run_command_on_all_nodes(
$$
SELECT jsonb_agg(to_jsonb(q2.*)) FROM (
SELECT
schemaname,
tablename,
tableowner
FROM
pg_tables
WHERE
tablename in ('test_table3', 'test_table4')
ORDER BY tablename
) q2
$$
) ORDER BY result;
--clear resources
DROP OWNED BY distributed_source_role1, "distributed_source_role-\!","distributed_target_role1-\!",local_target_role1;
SELECT result from run_command_on_all_nodes(
$$
SELECT jsonb_agg(to_jsonb(q2.*)) FROM (
SELECT
schemaname,
tablename,
tableowner
FROM
pg_tables
WHERE
tablename in ('test_table', 'test_table2', 'test_table3', 'test_table4')
) q2
$$
) ORDER BY result;
set client_min_messages to warning;
drop role distributed_source_role1, "distributed_source_role-\!","distributed_target_role1-\!",local_target_role1,local_source_role1;