Fixes distributed_object management

create_drop_db_gh
gindibay 2023-10-13 04:16:20 +03:00
parent 761fb13ac8
commit bc0a283221
3 changed files with 37 additions and 2 deletions

View File

@ -363,6 +363,25 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
EnsureCoordinator();
DropdbStmt *stmt = (DropdbStmt *) node;
Oid databaseOid = get_database_oid(stmt->dbname, stmt->missing_ok);
if (databaseOid == InvalidOid)
{
/* let regular ProcessUtility deal with IF NOT EXISTS */
return NIL;
}
ObjectAddress dbAddress = { 0 };
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
if (!IsObjectDistributed(&dbAddress))
{
return NIL;
}
UnmarkObjectDistributed(&dbAddress);
char *dropDatabaseCommand = DeparseTreeNode(node);
StringInfo internalDropCommand = makeStringInfo();
@ -370,9 +389,22 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
"SELECT pg_catalog.citus_internal_database_command(%s)",
quote_literal_cstr(dropDatabaseCommand));
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) internalDropCommand->data,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
List *
CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
{
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
Oid databaseOid = get_database_oid(stmt->dbname, missing_ok);
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid);
return list_make1(dbAddress);
}

View File

@ -473,8 +473,8 @@ static DistributeObjectOps Database_Create = {
.postprocess = PostprocessCreateDatabaseStmt,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_CREATE,
.address = NULL,
.markDistributed = false,
.address = CreateDatabaseStmtObjectAddress,
.markDistributed = true,
};
static DistributeObjectOps Database_Drop = {

View File

@ -230,6 +230,9 @@ extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *que
ProcessUtilityContext
processUtilityContext);
extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool
isPostprocess);
extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);