tests db as role

pull/7274/head
gindibay 2023-10-24 10:09:13 +03:00
parent 690276c516
commit c9dae2684f
5 changed files with 44 additions and 33 deletions

View File

@ -28,6 +28,8 @@
#include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
#include "distributed/worker_transaction.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
/*
@ -339,3 +341,31 @@ DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, bool isPostproces
return objectAddresses;
}
void UnmarkRolesAndDatabaseDistributed(Node *node)
{
if (IsA(node, DropRoleStmt))
{
DropRoleStmt *stmt = castNode(DropRoleStmt, node);
List *allDropRoles = stmt->roles;
List *distributedDropRoles = FilterDistributedRoles(allDropRoles);
if (list_length(distributedDropRoles) > 0)
{
UnmarkRolesDistributed(distributedDropRoles);
}
}
else if (IsA(node, DropdbStmt))
{
elog(LOG, "Unmarking database1 as distributed");
DropdbStmt *stmt = castNode(DropdbStmt, node);
char *dbName = stmt->dbname;
Oid dbOid = get_database_oid(dbName, stmt->missing_ok);
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*dbAddress, DatabaseRelationId, dbOid);
UnmarkObjectDistributed(dbAddress);
elog(LOG, "Unmarking database %s as distributed", dbName);
}
}

View File

@ -352,19 +352,6 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
static char *
GetUnmarkDatabaseDistributedSql(char *dbName)
{
StringInfoData pg_dist_object_delete = { 0 };
initStringInfo(&pg_dist_object_delete);
appendStringInfo(&pg_dist_object_delete, "delete from pg_dist_object where "
"objid in (select oid from pg_database where datname = '%s')",
dbName);
return pg_dist_object_delete.data;
}
List *
PreprocessDropDatabaseStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
@ -375,6 +362,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
}
EnsureCoordinator();
EnsureSequentialModeForRoleDDL();
DropdbStmt *stmt = (DropdbStmt *) node;
@ -394,9 +382,6 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
return NIL;
}
UnmarkObjectDistributed(&dbAddress);
char *unmarkDatabaseDistributedSql = GetUnmarkDatabaseDistributedSql(stmt->dbname);
char *dropDatabaseCommand = DeparseTreeNode(node);
StringInfo internalDropCommand = makeStringInfo();
@ -405,8 +390,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
quote_literal_cstr(dropDatabaseCommand));
List *commands = list_make4(DISABLE_DDL_PROPAGATION,
unmarkDatabaseDistributedSql,
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) internalDropCommand->data,
ENABLE_DDL_PROPAGATION);

View File

@ -65,7 +65,6 @@ static DefElem * makeDefElemBool(char *name, bool value);
static List * GenerateRoleOptionsList(HeapTuple tuple);
static List * GenerateGrantRoleStmtsFromOptions(RoleSpec *roleSpec, List *options);
static List * GenerateGrantRoleStmtsOfRole(Oid roleid);
static void EnsureSequentialModeForRoleDDL(void);
static char * GetRoleNameFromDbRoleSetting(HeapTuple tuple,
TupleDesc DbRoleSettingDescription);
@ -1080,6 +1079,7 @@ UnmarkRolesDistributed(List *roles)
}
ObjectAddressSet(roleAddress, AuthIdRelationId, roleOid);
elog(LOG, "Unmarking role %s as distributed", role->rolename);
UnmarkObjectDistributed(&roleAddress);
}
}
@ -1278,7 +1278,7 @@ CreateRoleStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
* with the role the role needs to be visible on all connections used by the transaction,
* meaning we can only use 1 connection per node.
*/
static void
void
EnsureSequentialModeForRoleDDL(void)
{
if (!IsTransactionBlock())

View File

@ -80,6 +80,7 @@
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "catalog/pg_database.h"
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
@ -148,6 +149,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
DestReceiver *dest,
QueryCompletion *completionTag)
{
elog(LOG, "multi_ProcessUtility called");
if (readOnlyTree)
{
pstmt = copyObject(pstmt);
@ -578,6 +580,8 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
PreprocessLockStatement((LockStmt *) parsetree, context);
}
/*
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
* and distribute the partition if necessary.
@ -724,22 +728,12 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
}
/*
* Make sure that dropping the role deletes the pg_dist_object entries. There is a
* separate logic for roles, since roles are not included as dropped objects in the
* Make sure that dropping the role and database deletes the pg_dist_object entries. There is a
* separate logic for roles and database, since roles database are not included as dropped objects in the
* drop event trigger. To handle it both on worker and coordinator nodes, it is not
* implemented as a part of process functions but here.
*/
if (IsA(parsetree, DropRoleStmt))
{
DropRoleStmt *stmt = castNode(DropRoleStmt, parsetree);
List *allDropRoles = stmt->roles;
List *distributedDropRoles = FilterDistributedRoles(allDropRoles);
if (list_length(distributedDropRoles) > 0)
{
UnmarkRolesDistributed(distributedDropRoles);
}
}
UnmarkRolesAndDatabaseDistributed(parsetree);
pstmt->utilityStmt = parsetree;

View File

@ -193,6 +193,7 @@ extern List * DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, boo
isPostprocess);
extern List * DropTextSearchDictObjectAddress(Node *node, bool missing_ok, bool
isPostprocess);
extern void UnmarkRolesAndDatabaseDistributed(Node *node);
/* index.c */
typedef void (*PGIndexProcessor)(Form_pg_index, List **, int);
@ -241,6 +242,7 @@ extern List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
extern List * PreprocessDropDatabaseStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
/* domain.c - forward declarations */
extern List * CreateDomainStmtObjectAddress(Node *node, bool missing_ok, bool
isPostprocess);
@ -510,6 +512,7 @@ extern List * RenameRoleStmtObjectAddress(Node *stmt, bool missing_ok, bool
extern void UnmarkRolesDistributed(List *roles);
extern List * FilterDistributedRoles(List *roles);
extern void EnsureSequentialModeForRoleDDL(void);
/* schema.c - forward declarations */
extern List * PostprocessCreateSchemaStmt(Node *node, const char *queryString);