mirror of https://github.com/citusdata/citus.git
Merge branch 'drop_db_as_role' into create_alter_database
commit
87f694ec0f
|
@ -28,6 +28,8 @@
|
|||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "catalog/pg_database.h"
|
||||
#include "commands/dbcommands.h"
|
||||
|
||||
|
||||
/*
|
||||
|
@ -339,3 +341,4 @@ DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, bool isPostproces
|
|||
|
||||
return objectAddresses;
|
||||
}
|
||||
|
||||
|
|
|
@ -273,16 +273,11 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
|
|||
|
||||
char *createDatabaseCommand = DeparseTreeNode(node);
|
||||
|
||||
StringInfo internalCreateCommand = makeStringInfo();
|
||||
appendStringInfo(internalCreateCommand,
|
||||
"SELECT pg_catalog.citus_internal_database_command(%s)",
|
||||
quote_literal_cstr(createDatabaseCommand));
|
||||
|
||||
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||
(void *) internalCreateCommand->data,
|
||||
(void *) createDatabaseCommand,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands);
|
||||
}
|
||||
|
||||
|
||||
|
@ -352,37 +347,11 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
|
|||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
static char *
|
||||
GetUnmarkDatabaseDistributedSql(char *dbName)
|
||||
{
|
||||
StringInfoData pg_dist_object_delete = { 0 };
|
||||
initStringInfo(&pg_dist_object_delete);
|
||||
appendStringInfo(&pg_dist_object_delete, "delete from pg_dist_object where "
|
||||
"objid in (select oid from pg_database where datname = '%s')",
|
||||
dbName);
|
||||
return pg_dist_object_delete.data;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
UnmarkObjectDistributedForDropDb(const ObjectAddress *distAddress, char *dbName)
|
||||
{
|
||||
UnmarkObjectDistributed(distAddress);
|
||||
|
||||
if (EnableMetadataSync)
|
||||
{
|
||||
char *workerPgDistObjectUpdateCommand =
|
||||
GetUnmarkDatabaseDistributedSql(dbName);
|
||||
SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
List *
|
||||
PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
bool isPostProcess = false;
|
||||
if (!EnableCreateDatabasePropagation || !ShouldPropagate())
|
||||
{
|
||||
return NIL;
|
||||
|
@ -392,49 +361,49 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
|||
|
||||
DropdbStmt *stmt = (DropdbStmt *) node;
|
||||
|
||||
Oid databaseOid = get_database_oid(stmt->dbname, stmt->missing_ok);
|
||||
List *addresses = GetObjectAddressListFromParseTree(node, stmt->missing_ok, isPostProcess);
|
||||
|
||||
if (databaseOid == InvalidOid)
|
||||
{
|
||||
/* let regular ProcessUtility deal with IF NOT EXISTS */
|
||||
return NIL;
|
||||
}
|
||||
|
||||
|
||||
ObjectAddress dbAddress = { 0 };
|
||||
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
|
||||
if (!IsObjectDistributed(&dbAddress))
|
||||
if (list_length(addresses) == 0)
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
UnmarkObjectDistributedForDropDb(&dbAddress, stmt->dbname);
|
||||
char *unmarkDatabaseDistributedSql = GetUnmarkDatabaseDistributedSql(stmt->dbname);
|
||||
ObjectAddress *address = (ObjectAddress *) linitial(addresses);
|
||||
if (address->objectId == InvalidOid ||!IsObjectDistributed(address))
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
char *dropDatabaseCommand = DeparseTreeNode(node);
|
||||
|
||||
StringInfo internalDropCommand = makeStringInfo();
|
||||
appendStringInfo(internalDropCommand,
|
||||
"SELECT pg_catalog.citus_internal_database_command(%s)",
|
||||
quote_literal_cstr(dropDatabaseCommand));
|
||||
|
||||
|
||||
List *commands = list_make4(DISABLE_DDL_PROPAGATION,
|
||||
unmarkDatabaseDistributedSql,
|
||||
(void *) internalDropCommand->data,
|
||||
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||
(void *) dropDatabaseCommand,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands);
|
||||
}
|
||||
|
||||
static ObjectAddress *GetDatabaseAddressFromDatabaseName(char *databaseName)
|
||||
{
|
||||
Oid databaseOid = get_database_oid(databaseName, false);
|
||||
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress));
|
||||
ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid);
|
||||
return dbAddress;
|
||||
}
|
||||
|
||||
List *
|
||||
DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
|
||||
{
|
||||
DropdbStmt *stmt = castNode(DropdbStmt, node);
|
||||
ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname);
|
||||
return list_make1(dbAddress);
|
||||
}
|
||||
|
||||
List *
|
||||
CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
|
||||
{
|
||||
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
|
||||
Oid databaseOid = get_database_oid(stmt->dbname, missing_ok);
|
||||
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress));
|
||||
ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid);
|
||||
|
||||
ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname);
|
||||
return list_make1(dbAddress);
|
||||
}
|
||||
|
|
|
@ -484,7 +484,7 @@ static DistributeObjectOps Database_Drop = {
|
|||
.postprocess = NULL,
|
||||
.objectType = OBJECT_DATABASE,
|
||||
.operationType = DIST_OPS_DROP,
|
||||
.address = NULL,
|
||||
.address = DropDatabaseStmtObjectAddress,
|
||||
.markDistributed = false,
|
||||
};
|
||||
|
||||
|
|
|
@ -65,7 +65,6 @@ static DefElem * makeDefElemBool(char *name, bool value);
|
|||
static List * GenerateRoleOptionsList(HeapTuple tuple);
|
||||
static List * GenerateGrantRoleStmtsFromOptions(RoleSpec *roleSpec, List *options);
|
||||
static List * GenerateGrantRoleStmtsOfRole(Oid roleid);
|
||||
static void EnsureSequentialModeForRoleDDL(void);
|
||||
|
||||
static char * GetRoleNameFromDbRoleSetting(HeapTuple tuple,
|
||||
TupleDesc DbRoleSettingDescription);
|
||||
|
@ -1080,6 +1079,7 @@ UnmarkRolesDistributed(List *roles)
|
|||
}
|
||||
|
||||
ObjectAddressSet(roleAddress, AuthIdRelationId, roleOid);
|
||||
elog(LOG, "Unmarking role %s as distributed", role->rolename);
|
||||
UnmarkObjectDistributed(&roleAddress);
|
||||
}
|
||||
}
|
||||
|
@ -1278,7 +1278,7 @@ CreateRoleStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
|
|||
* with the role the role needs to be visible on all connections used by the transaction,
|
||||
* meaning we can only use 1 connection per node.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
EnsureSequentialModeForRoleDDL(void)
|
||||
{
|
||||
if (!IsTransactionBlock())
|
||||
|
|
|
@ -80,6 +80,7 @@
|
|||
#include "utils/inval.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "catalog/pg_database.h"
|
||||
|
||||
|
||||
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
|
||||
|
@ -148,6 +149,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
DestReceiver *dest,
|
||||
QueryCompletion *completionTag)
|
||||
{
|
||||
elog(LOG, "multi_ProcessUtility called");
|
||||
if (readOnlyTree)
|
||||
{
|
||||
pstmt = copyObject(pstmt);
|
||||
|
@ -578,6 +580,8 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
PreprocessLockStatement((LockStmt *) parsetree, context);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
|
||||
* and distribute the partition if necessary.
|
||||
|
@ -724,22 +728,12 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
}
|
||||
|
||||
/*
|
||||
* Make sure that dropping the role deletes the pg_dist_object entries. There is a
|
||||
* separate logic for roles, since roles are not included as dropped objects in the
|
||||
* Make sure that dropping the role and database deletes the pg_dist_object entries. There is a
|
||||
* separate logic for roles and database, since roles and database are not included as dropped objects in the
|
||||
* drop event trigger. To handle it both on worker and coordinator nodes, it is not
|
||||
* implemented as a part of process functions but here.
|
||||
*/
|
||||
if (IsA(parsetree, DropRoleStmt))
|
||||
{
|
||||
DropRoleStmt *stmt = castNode(DropRoleStmt, parsetree);
|
||||
List *allDropRoles = stmt->roles;
|
||||
|
||||
List *distributedDropRoles = FilterDistributedRoles(allDropRoles);
|
||||
if (list_length(distributedDropRoles) > 0)
|
||||
{
|
||||
UnmarkRolesDistributed(distributedDropRoles);
|
||||
}
|
||||
}
|
||||
UnmarkRolesAndDatabaseDistributed(parsetree);
|
||||
|
||||
pstmt->utilityStmt = parsetree;
|
||||
|
||||
|
@ -1488,6 +1482,24 @@ DDLTaskList(Oid relationId, const char *commandString)
|
|||
return taskList;
|
||||
}
|
||||
|
||||
/*
|
||||
* NontransactionalNodeDDLTask builds a list of tasks to execute a DDL command on a
|
||||
* given target set of nodes with cannotBeExecutedInTransction is set to make sure
|
||||
* that list is being executed without a transaction.
|
||||
*/
|
||||
List * NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands ){
|
||||
List *ddlJobs = NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
DDLJob *ddlJob = NULL;
|
||||
foreach_ptr(ddlJob, ddlJobs)
|
||||
{
|
||||
Task *task = NULL;
|
||||
foreach_ptr(task, ddlJob->taskList)
|
||||
{
|
||||
task->cannotBeExecutedInTransction = true;
|
||||
}
|
||||
}
|
||||
return ddlJobs;
|
||||
}
|
||||
|
||||
/*
|
||||
* NodeDDLTaskList builds a list of tasks to execute a DDL command on a
|
||||
|
|
|
@ -48,6 +48,8 @@
|
|||
#include "utils/lsyscache.h"
|
||||
#include "utils/regproc.h"
|
||||
#include "utils/rel.h"
|
||||
#include "catalog/pg_database.h"
|
||||
#include "commands/dbcommands.h"
|
||||
|
||||
|
||||
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
||||
|
@ -355,6 +357,31 @@ ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
|||
return spiStatus;
|
||||
}
|
||||
|
||||
void UnmarkRolesAndDatabaseDistributed(Node *node)
|
||||
{
|
||||
if (IsA(node, DropRoleStmt))
|
||||
{
|
||||
DropRoleStmt *stmt = castNode(DropRoleStmt, node);
|
||||
List *allDropRoles = stmt->roles;
|
||||
|
||||
List *distributedDropRoles = FilterDistributedRoles(allDropRoles);
|
||||
if (list_length(distributedDropRoles) > 0)
|
||||
{
|
||||
UnmarkRolesDistributed(distributedDropRoles);
|
||||
}
|
||||
|
||||
}
|
||||
else if (IsA(node, DropdbStmt))
|
||||
{
|
||||
DropdbStmt *stmt = castNode(DropdbStmt, node);
|
||||
char *dbName = stmt->dbname;
|
||||
|
||||
Oid dbOid = get_database_oid(dbName, stmt->missing_ok);
|
||||
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress));
|
||||
ObjectAddressSet(*dbAddress, DatabaseRelationId, dbOid);
|
||||
UnmarkObjectDistributed(dbAddress);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* UnmarkObjectDistributed removes the entry from pg_dist_object that marks this object as
|
||||
|
|
|
@ -240,6 +240,9 @@ extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString
|
|||
extern List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess);
|
||||
extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess);
|
||||
|
||||
|
||||
/* domain.c - forward declarations */
|
||||
extern List * CreateDomainStmtObjectAddress(Node *node, bool missing_ok, bool
|
||||
|
@ -510,6 +513,7 @@ extern List * RenameRoleStmtObjectAddress(Node *stmt, bool missing_ok, bool
|
|||
|
||||
extern void UnmarkRolesDistributed(List *roles);
|
||||
extern List * FilterDistributedRoles(List *roles);
|
||||
extern void EnsureSequentialModeForRoleDDL(void);
|
||||
|
||||
/* schema.c - forward declarations */
|
||||
extern List * PostprocessCreateSchemaStmt(Node *node, const char *queryString);
|
||||
|
|
|
@ -94,6 +94,7 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString,
|
|||
extern void MarkInvalidateForeignKeyGraph(void);
|
||||
extern void InvalidateForeignKeyGraphForDDL(void);
|
||||
extern List * DDLTaskList(Oid relationId, const char *commandString);
|
||||
extern List * NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands );
|
||||
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
|
||||
extern bool AlterTableInProgress(void);
|
||||
extern bool DropSchemaOrDBInProgress(void);
|
||||
|
|
|
@ -27,6 +27,7 @@ extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
|||
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
||||
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
||||
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
||||
extern void UnmarkRolesAndDatabaseDistributed(Node *node);
|
||||
extern bool IsTableOwnedByExtension(Oid relationId);
|
||||
extern bool ObjectAddressDependsOnExtension(const ObjectAddress *target);
|
||||
extern bool IsAnyObjectAddressOwnedByExtension(const List *targets,
|
||||
|
|
Loading…
Reference in New Issue