From c9dae2684fdbbf3d91f740b0d5360542df74dd17 Mon Sep 17 00:00:00 2001 From: gindibay Date: Tue, 24 Oct 2023 10:09:13 +0300 Subject: [PATCH 1/2] tests db as role --- src/backend/distributed/commands/common.c | 30 +++++++++++++++++++ src/backend/distributed/commands/database.c | 20 ++----------- src/backend/distributed/commands/role.c | 4 +-- .../distributed/commands/utility_hook.c | 20 +++++-------- src/include/distributed/commands.h | 3 ++ 5 files changed, 44 insertions(+), 33 deletions(-) diff --git a/src/backend/distributed/commands/common.c b/src/backend/distributed/commands/common.c index 797981d47..bfc03cca1 100644 --- a/src/backend/distributed/commands/common.c +++ b/src/backend/distributed/commands/common.c @@ -28,6 +28,8 @@ #include "distributed/metadata/distobject.h" #include "distributed/multi_executor.h" #include "distributed/worker_transaction.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" /* @@ -339,3 +341,31 @@ DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, bool isPostproces return objectAddresses; } + +void UnmarkRolesAndDatabaseDistributed(Node *node) +{ + if (IsA(node, DropRoleStmt)) + { + DropRoleStmt *stmt = castNode(DropRoleStmt, node); + List *allDropRoles = stmt->roles; + + List *distributedDropRoles = FilterDistributedRoles(allDropRoles); + if (list_length(distributedDropRoles) > 0) + { + UnmarkRolesDistributed(distributedDropRoles); + } + + } + else if (IsA(node, DropdbStmt)) + { + elog(LOG, "Unmarking database1 as distributed"); + DropdbStmt *stmt = castNode(DropdbStmt, node); + char *dbName = stmt->dbname; + + Oid dbOid = get_database_oid(dbName, stmt->missing_ok); + ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*dbAddress, DatabaseRelationId, dbOid); + UnmarkObjectDistributed(dbAddress); + elog(LOG, "Unmarking database %s as distributed", dbName); + } +} diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 88aea7abb..26626d562 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -352,19 +352,6 @@ citus_internal_database_command(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } - -static char * -GetUnmarkDatabaseDistributedSql(char *dbName) -{ - StringInfoData pg_dist_object_delete = { 0 }; - initStringInfo(&pg_dist_object_delete); - appendStringInfo(&pg_dist_object_delete, "delete from pg_dist_object where " - "objid in (select oid from pg_database where datname = '%s')", - dbName); - return pg_dist_object_delete.data; -} - - List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext) @@ -375,6 +362,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, } EnsureCoordinator(); + EnsureSequentialModeForRoleDDL(); DropdbStmt *stmt = (DropdbStmt *) node; @@ -394,9 +382,6 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, return NIL; } - UnmarkObjectDistributed(&dbAddress); - char *unmarkDatabaseDistributedSql = GetUnmarkDatabaseDistributedSql(stmt->dbname); - char *dropDatabaseCommand = DeparseTreeNode(node); StringInfo internalDropCommand = makeStringInfo(); @@ -405,8 +390,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, quote_literal_cstr(dropDatabaseCommand)); - List *commands = list_make4(DISABLE_DDL_PROPAGATION, - unmarkDatabaseDistributedSql, + List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) internalDropCommand->data, ENABLE_DDL_PROPAGATION); diff --git a/src/backend/distributed/commands/role.c b/src/backend/distributed/commands/role.c index 754be1a2b..34be44637 100644 --- a/src/backend/distributed/commands/role.c +++ b/src/backend/distributed/commands/role.c @@ -65,7 +65,6 @@ static DefElem * makeDefElemBool(char *name, bool value); static List * GenerateRoleOptionsList(HeapTuple tuple); static List * GenerateGrantRoleStmtsFromOptions(RoleSpec *roleSpec, List *options); static List * GenerateGrantRoleStmtsOfRole(Oid roleid); -static void EnsureSequentialModeForRoleDDL(void); static char * GetRoleNameFromDbRoleSetting(HeapTuple tuple, TupleDesc DbRoleSettingDescription); @@ -1080,6 +1079,7 @@ UnmarkRolesDistributed(List *roles) } ObjectAddressSet(roleAddress, AuthIdRelationId, roleOid); + elog(LOG, "Unmarking role %s as distributed", role->rolename); UnmarkObjectDistributed(&roleAddress); } } @@ -1278,7 +1278,7 @@ CreateRoleStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) * with the role the role needs to be visible on all connections used by the transaction, * meaning we can only use 1 connection per node. */ -static void +void EnsureSequentialModeForRoleDDL(void) { if (!IsTransactionBlock()) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index dd729cad0..108f6b50a 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -80,6 +80,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "catalog/pg_database.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ @@ -148,6 +149,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, DestReceiver *dest, QueryCompletion *completionTag) { + elog(LOG, "multi_ProcessUtility called"); if (readOnlyTree) { pstmt = copyObject(pstmt); @@ -578,6 +580,8 @@ ProcessUtilityInternal(PlannedStmt *pstmt, PreprocessLockStatement((LockStmt *) parsetree, context); } + + /* * We only process ALTER TABLE ... ATTACH PARTITION commands in the function below * and distribute the partition if necessary. @@ -724,22 +728,12 @@ ProcessUtilityInternal(PlannedStmt *pstmt, } /* - * Make sure that dropping the role deletes the pg_dist_object entries. There is a - * separate logic for roles, since roles are not included as dropped objects in the + * Make sure that dropping the role and database deletes the pg_dist_object entries. There is a + * separate logic for roles and database, since roles database are not included as dropped objects in the * drop event trigger. To handle it both on worker and coordinator nodes, it is not * implemented as a part of process functions but here. */ - if (IsA(parsetree, DropRoleStmt)) - { - DropRoleStmt *stmt = castNode(DropRoleStmt, parsetree); - List *allDropRoles = stmt->roles; - - List *distributedDropRoles = FilterDistributedRoles(allDropRoles); - if (list_length(distributedDropRoles) > 0) - { - UnmarkRolesDistributed(distributedDropRoles); - } - } + UnmarkRolesAndDatabaseDistributed(parsetree); pstmt->utilityStmt = parsetree; diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index b1f65177e..cde0bd4f0 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -193,6 +193,7 @@ extern List * DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, boo isPostprocess); extern List * DropTextSearchDictObjectAddress(Node *node, bool missing_ok, bool isPostprocess); +extern void UnmarkRolesAndDatabaseDistributed(Node *node); /* index.c */ typedef void (*PGIndexProcessor)(Form_pg_index, List **, int); @@ -241,6 +242,7 @@ extern List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString) extern List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); + /* domain.c - forward declarations */ extern List * CreateDomainStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess); @@ -510,6 +512,7 @@ extern List * RenameRoleStmtObjectAddress(Node *stmt, bool missing_ok, bool extern void UnmarkRolesDistributed(List *roles); extern List * FilterDistributedRoles(List *roles); +extern void EnsureSequentialModeForRoleDDL(void); /* schema.c - forward declarations */ extern List * PostprocessCreateSchemaStmt(Node *node, const char *queryString); From 73f0db2aeddd6c39dfd6aed88afa7261349cfad3 Mon Sep 17 00:00:00 2001 From: gindibay Date: Tue, 24 Oct 2023 14:09:36 +0300 Subject: [PATCH 2/2] Fixes create and drop database transaction use --- src/backend/distributed/commands/common.c | 27 ---------- src/backend/distributed/commands/database.c | 51 +++++++++---------- .../commands/distribute_object_ops.c | 2 +- .../distributed/commands/utility_hook.c | 22 +++++++- src/backend/distributed/metadata/distobject.c | 27 ++++++++++ src/include/distributed/commands.h | 3 +- .../distributed/commands/utility_hook.h | 1 + src/include/distributed/metadata/distobject.h | 1 + 8 files changed, 77 insertions(+), 57 deletions(-) diff --git a/src/backend/distributed/commands/common.c b/src/backend/distributed/commands/common.c index bfc03cca1..b338792d8 100644 --- a/src/backend/distributed/commands/common.c +++ b/src/backend/distributed/commands/common.c @@ -342,30 +342,3 @@ DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, bool isPostproces return objectAddresses; } -void UnmarkRolesAndDatabaseDistributed(Node *node) -{ - if (IsA(node, DropRoleStmt)) - { - DropRoleStmt *stmt = castNode(DropRoleStmt, node); - List *allDropRoles = stmt->roles; - - List *distributedDropRoles = FilterDistributedRoles(allDropRoles); - if (list_length(distributedDropRoles) > 0) - { - UnmarkRolesDistributed(distributedDropRoles); - } - - } - else if (IsA(node, DropdbStmt)) - { - elog(LOG, "Unmarking database1 as distributed"); - DropdbStmt *stmt = castNode(DropdbStmt, node); - char *dbName = stmt->dbname; - - Oid dbOid = get_database_oid(dbName, stmt->missing_ok); - ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); - ObjectAddressSet(*dbAddress, DatabaseRelationId, dbOid); - UnmarkObjectDistributed(dbAddress); - elog(LOG, "Unmarking database %s as distributed", dbName); - } -} diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 26626d562..34813085e 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -273,16 +273,11 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString) char *createDatabaseCommand = DeparseTreeNode(node); - StringInfo internalCreateCommand = makeStringInfo(); - appendStringInfo(internalCreateCommand, - "SELECT pg_catalog.citus_internal_database_command(%s)", - quote_literal_cstr(createDatabaseCommand)); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) internalCreateCommand->data, + (void *) createDatabaseCommand, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands); } @@ -356,55 +351,59 @@ List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext) { + bool isPostProcess = false; if (!EnableCreateDatabasePropagation || !ShouldPropagate()) { return NIL; } EnsureCoordinator(); - EnsureSequentialModeForRoleDDL(); DropdbStmt *stmt = (DropdbStmt *) node; - Oid databaseOid = get_database_oid(stmt->dbname, stmt->missing_ok); + List *addresses = GetObjectAddressListFromParseTree(node, stmt->missing_ok, isPostProcess); - if (databaseOid == InvalidOid) + if (list_length(addresses) == 0) { - /* let regular ProcessUtility deal with IF NOT EXISTS */ return NIL; } - - ObjectAddress dbAddress = { 0 }; - ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid); - if (!IsObjectDistributed(&dbAddress)) + ObjectAddress *address = (ObjectAddress *) linitial(addresses); + if (address->objectId == InvalidOid ||!IsObjectDistributed(address)) { return NIL; } char *dropDatabaseCommand = DeparseTreeNode(node); - StringInfo internalDropCommand = makeStringInfo(); - appendStringInfo(internalDropCommand, - "SELECT pg_catalog.citus_internal_database_command(%s)", - quote_literal_cstr(dropDatabaseCommand)); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) internalDropCommand->data, + (void *) dropDatabaseCommand, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands); } +static ObjectAddress *GetDatabaseAddressFromDatabaseName(char *databaseName) +{ + Oid databaseOid = get_database_oid(databaseName, false); + ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid); + return dbAddress; +} + +List * +DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) +{ + DropdbStmt *stmt = castNode(DropdbStmt, node); + ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname); + return list_make1(dbAddress); +} List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) { CreatedbStmt *stmt = castNode(CreatedbStmt, node); - Oid databaseOid = get_database_oid(stmt->dbname, missing_ok); - ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); - ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid); - + ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname); return list_make1(dbAddress); } diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 49a96e016..2888bef1d 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -484,7 +484,7 @@ static DistributeObjectOps Database_Drop = { .postprocess = NULL, .objectType = OBJECT_DATABASE, .operationType = DIST_OPS_DROP, - .address = NULL, + .address = DropDatabaseStmtObjectAddress, .markDistributed = false, }; diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 108f6b50a..3cd2ccd1f 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -729,7 +729,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt, /* * Make sure that dropping the role and database deletes the pg_dist_object entries. There is a - * separate logic for roles and database, since roles database are not included as dropped objects in the + * separate logic for roles and database, since roles and database are not included as dropped objects in the * drop event trigger. To handle it both on worker and coordinator nodes, it is not * implemented as a part of process functions but here. */ @@ -1482,13 +1482,31 @@ DDLTaskList(Oid relationId, const char *commandString) return taskList; } +/* + * NontransactionalNodeDDLTask builds a list of tasks to execute a DDL command on a + * given target set of nodes with cannotBeExecutedInTransction is set to make sure + * that list is being executed without a transaction. + */ +List * NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands ){ + List *ddlJobs = NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + DDLJob *ddlJob = NULL; + foreach_ptr(ddlJob, ddlJobs) + { + Task *task = NULL; + foreach_ptr(task, ddlJob->taskList) + { + task->cannotBeExecutedInTransction = true; + } + } + return ddlJobs; +} /* * NodeDDLTaskList builds a list of tasks to execute a DDL command on a * given target set of nodes. */ List * -NodeDDLTaskList(TargetWorkerSet targets, List *commands) +NodeDDLTaskList(TargetWorkerSet targets, List *commands ) { DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetObjectAddress = InvalidObjectAddress; diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index c420e6ec3..722f51bc9 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -48,6 +48,8 @@ #include "utils/lsyscache.h" #include "utils/regproc.h" #include "utils/rel.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); @@ -355,6 +357,31 @@ ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, return spiStatus; } +void UnmarkRolesAndDatabaseDistributed(Node *node) +{ + if (IsA(node, DropRoleStmt)) + { + DropRoleStmt *stmt = castNode(DropRoleStmt, node); + List *allDropRoles = stmt->roles; + + List *distributedDropRoles = FilterDistributedRoles(allDropRoles); + if (list_length(distributedDropRoles) > 0) + { + UnmarkRolesDistributed(distributedDropRoles); + } + + } + else if (IsA(node, DropdbStmt)) + { + DropdbStmt *stmt = castNode(DropdbStmt, node); + char *dbName = stmt->dbname; + + Oid dbOid = get_database_oid(dbName, stmt->missing_ok); + ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*dbAddress, DatabaseRelationId, dbOid); + UnmarkObjectDistributed(dbAddress); + } +} /* * UnmarkObjectDistributed removes the entry from pg_dist_object that marks this object as diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index cde0bd4f0..95d6e9a13 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -193,7 +193,6 @@ extern List * DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, boo isPostprocess); extern List * DropTextSearchDictObjectAddress(Node *node, bool missing_ok, bool isPostprocess); -extern void UnmarkRolesAndDatabaseDistributed(Node *node); /* index.c */ typedef void (*PGIndexProcessor)(Form_pg_index, List **, int); @@ -241,6 +240,8 @@ extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString extern List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString); extern List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); +extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess); +extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess); /* domain.c - forward declarations */ diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 9ae57b49a..3295d110c 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -94,6 +94,7 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString, extern void MarkInvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraphForDDL(void); extern List * DDLTaskList(Oid relationId, const char *commandString); +extern List * NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands ); extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands); extern bool AlterTableInProgress(void); extern bool DropSchemaOrDBInProgress(void); diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index ba984091c..86fada5f7 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -27,6 +27,7 @@ extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); +extern void UnmarkRolesAndDatabaseDistributed(Node *node); extern bool IsTableOwnedByExtension(Oid relationId); extern bool ObjectAddressDependsOnExtension(const ObjectAddress *target); extern bool IsAnyObjectAddressOwnedByExtension(const List *targets,