diff --git a/src/backend/distributed/commands/common.c b/src/backend/distributed/commands/common.c index 797981d47..b338792d8 100644 --- a/src/backend/distributed/commands/common.c +++ b/src/backend/distributed/commands/common.c @@ -28,6 +28,8 @@ #include "distributed/metadata/distobject.h" #include "distributed/multi_executor.h" #include "distributed/worker_transaction.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" /* @@ -339,3 +341,4 @@ DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, bool isPostproces return objectAddresses; } + diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 94de7f508..34813085e 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -273,16 +273,11 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString) char *createDatabaseCommand = DeparseTreeNode(node); - StringInfo internalCreateCommand = makeStringInfo(); - appendStringInfo(internalCreateCommand, - "SELECT pg_catalog.citus_internal_database_command(%s)", - quote_literal_cstr(createDatabaseCommand)); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) internalCreateCommand->data, + (void *) createDatabaseCommand, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands); } @@ -352,37 +347,11 @@ citus_internal_database_command(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } - -static char * -GetUnmarkDatabaseDistributedSql(char *dbName) -{ - StringInfoData pg_dist_object_delete = { 0 }; - initStringInfo(&pg_dist_object_delete); - appendStringInfo(&pg_dist_object_delete, "delete from pg_dist_object where " - "objid in (select oid from pg_database where datname = '%s')", - dbName); - return pg_dist_object_delete.data; -} - - -static void -UnmarkObjectDistributedForDropDb(const ObjectAddress *distAddress, char *dbName) -{ - UnmarkObjectDistributed(distAddress); - - if (EnableMetadataSync) - { - char *workerPgDistObjectUpdateCommand = - GetUnmarkDatabaseDistributedSql(dbName); - SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand); - } -} - - List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext) { + bool isPostProcess = false; if (!EnableCreateDatabasePropagation || !ShouldPropagate()) { return NIL; @@ -392,49 +361,49 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, DropdbStmt *stmt = (DropdbStmt *) node; - Oid databaseOid = get_database_oid(stmt->dbname, stmt->missing_ok); + List *addresses = GetObjectAddressListFromParseTree(node, stmt->missing_ok, isPostProcess); - if (databaseOid == InvalidOid) - { - /* let regular ProcessUtility deal with IF NOT EXISTS */ - return NIL; - } - - - ObjectAddress dbAddress = { 0 }; - ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid); - if (!IsObjectDistributed(&dbAddress)) + if (list_length(addresses) == 0) { return NIL; } - UnmarkObjectDistributedForDropDb(&dbAddress, stmt->dbname); - char *unmarkDatabaseDistributedSql = GetUnmarkDatabaseDistributedSql(stmt->dbname); + ObjectAddress *address = (ObjectAddress *) linitial(addresses); + if (address->objectId == InvalidOid ||!IsObjectDistributed(address)) + { + return NIL; + } char *dropDatabaseCommand = DeparseTreeNode(node); - StringInfo internalDropCommand = makeStringInfo(); - appendStringInfo(internalDropCommand, - "SELECT pg_catalog.citus_internal_database_command(%s)", - quote_literal_cstr(dropDatabaseCommand)); - - List *commands = list_make4(DISABLE_DDL_PROPAGATION, - unmarkDatabaseDistributedSql, - (void *) internalDropCommand->data, + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) dropDatabaseCommand, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands); } +static ObjectAddress *GetDatabaseAddressFromDatabaseName(char *databaseName) +{ + Oid databaseOid = get_database_oid(databaseName, false); + ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid); + return dbAddress; +} + +List * +DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) +{ + DropdbStmt *stmt = castNode(DropdbStmt, node); + ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname); + return list_make1(dbAddress); +} List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) { CreatedbStmt *stmt = castNode(CreatedbStmt, node); - Oid databaseOid = get_database_oid(stmt->dbname, missing_ok); - ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); - ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid); - + ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname); return list_make1(dbAddress); } diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 49a96e016..2888bef1d 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -484,7 +484,7 @@ static DistributeObjectOps Database_Drop = { .postprocess = NULL, .objectType = OBJECT_DATABASE, .operationType = DIST_OPS_DROP, - .address = NULL, + .address = DropDatabaseStmtObjectAddress, .markDistributed = false, }; diff --git a/src/backend/distributed/commands/role.c b/src/backend/distributed/commands/role.c index 754be1a2b..34be44637 100644 --- a/src/backend/distributed/commands/role.c +++ b/src/backend/distributed/commands/role.c @@ -65,7 +65,6 @@ static DefElem * makeDefElemBool(char *name, bool value); static List * GenerateRoleOptionsList(HeapTuple tuple); static List * GenerateGrantRoleStmtsFromOptions(RoleSpec *roleSpec, List *options); static List * GenerateGrantRoleStmtsOfRole(Oid roleid); -static void EnsureSequentialModeForRoleDDL(void); static char * GetRoleNameFromDbRoleSetting(HeapTuple tuple, TupleDesc DbRoleSettingDescription); @@ -1080,6 +1079,7 @@ UnmarkRolesDistributed(List *roles) } ObjectAddressSet(roleAddress, AuthIdRelationId, roleOid); + elog(LOG, "Unmarking role %s as distributed", role->rolename); UnmarkObjectDistributed(&roleAddress); } } @@ -1278,7 +1278,7 @@ CreateRoleStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) * with the role the role needs to be visible on all connections used by the transaction, * meaning we can only use 1 connection per node. */ -static void +void EnsureSequentialModeForRoleDDL(void) { if (!IsTransactionBlock()) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index dd729cad0..3cd2ccd1f 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -80,6 +80,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "catalog/pg_database.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ @@ -148,6 +149,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, DestReceiver *dest, QueryCompletion *completionTag) { + elog(LOG, "multi_ProcessUtility called"); if (readOnlyTree) { pstmt = copyObject(pstmt); @@ -578,6 +580,8 @@ ProcessUtilityInternal(PlannedStmt *pstmt, PreprocessLockStatement((LockStmt *) parsetree, context); } + + /* * We only process ALTER TABLE ... ATTACH PARTITION commands in the function below * and distribute the partition if necessary. @@ -724,22 +728,12 @@ ProcessUtilityInternal(PlannedStmt *pstmt, } /* - * Make sure that dropping the role deletes the pg_dist_object entries. There is a - * separate logic for roles, since roles are not included as dropped objects in the + * Make sure that dropping the role and database deletes the pg_dist_object entries. There is a + * separate logic for roles and database, since roles and database are not included as dropped objects in the * drop event trigger. To handle it both on worker and coordinator nodes, it is not * implemented as a part of process functions but here. */ - if (IsA(parsetree, DropRoleStmt)) - { - DropRoleStmt *stmt = castNode(DropRoleStmt, parsetree); - List *allDropRoles = stmt->roles; - - List *distributedDropRoles = FilterDistributedRoles(allDropRoles); - if (list_length(distributedDropRoles) > 0) - { - UnmarkRolesDistributed(distributedDropRoles); - } - } + UnmarkRolesAndDatabaseDistributed(parsetree); pstmt->utilityStmt = parsetree; @@ -1488,13 +1482,31 @@ DDLTaskList(Oid relationId, const char *commandString) return taskList; } +/* + * NontransactionalNodeDDLTask builds a list of tasks to execute a DDL command on a + * given target set of nodes with cannotBeExecutedInTransction is set to make sure + * that list is being executed without a transaction. + */ +List * NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands ){ + List *ddlJobs = NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + DDLJob *ddlJob = NULL; + foreach_ptr(ddlJob, ddlJobs) + { + Task *task = NULL; + foreach_ptr(task, ddlJob->taskList) + { + task->cannotBeExecutedInTransction = true; + } + } + return ddlJobs; +} /* * NodeDDLTaskList builds a list of tasks to execute a DDL command on a * given target set of nodes. */ List * -NodeDDLTaskList(TargetWorkerSet targets, List *commands) +NodeDDLTaskList(TargetWorkerSet targets, List *commands ) { DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetObjectAddress = InvalidObjectAddress; diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index c420e6ec3..722f51bc9 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -48,6 +48,8 @@ #include "utils/lsyscache.h" #include "utils/regproc.h" #include "utils/rel.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); @@ -355,6 +357,31 @@ ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, return spiStatus; } +void UnmarkRolesAndDatabaseDistributed(Node *node) +{ + if (IsA(node, DropRoleStmt)) + { + DropRoleStmt *stmt = castNode(DropRoleStmt, node); + List *allDropRoles = stmt->roles; + + List *distributedDropRoles = FilterDistributedRoles(allDropRoles); + if (list_length(distributedDropRoles) > 0) + { + UnmarkRolesDistributed(distributedDropRoles); + } + + } + else if (IsA(node, DropdbStmt)) + { + DropdbStmt *stmt = castNode(DropdbStmt, node); + char *dbName = stmt->dbname; + + Oid dbOid = get_database_oid(dbName, stmt->missing_ok); + ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*dbAddress, DatabaseRelationId, dbOid); + UnmarkObjectDistributed(dbAddress); + } +} /* * UnmarkObjectDistributed removes the entry from pg_dist_object that marks this object as diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index b1f65177e..95d6e9a13 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -240,6 +240,9 @@ extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString extern List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString); extern List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); +extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess); +extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess); + /* domain.c - forward declarations */ extern List * CreateDomainStmtObjectAddress(Node *node, bool missing_ok, bool @@ -510,6 +513,7 @@ extern List * RenameRoleStmtObjectAddress(Node *stmt, bool missing_ok, bool extern void UnmarkRolesDistributed(List *roles); extern List * FilterDistributedRoles(List *roles); +extern void EnsureSequentialModeForRoleDDL(void); /* schema.c - forward declarations */ extern List * PostprocessCreateSchemaStmt(Node *node, const char *queryString); diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 9ae57b49a..3295d110c 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -94,6 +94,7 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString, extern void MarkInvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraphForDDL(void); extern List * DDLTaskList(Oid relationId, const char *commandString); +extern List * NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands ); extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands); extern bool AlterTableInProgress(void); extern bool DropSchemaOrDBInProgress(void); diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index ba984091c..86fada5f7 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -27,6 +27,7 @@ extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); +extern void UnmarkRolesAndDatabaseDistributed(Node *node); extern bool IsTableOwnedByExtension(Oid relationId); extern bool ObjectAddressDependsOnExtension(const ObjectAddress *target); extern bool IsAnyObjectAddressOwnedByExtension(const List *targets,