mirror of https://github.com/citusdata/citus.git
Fixes create and drop database transaction use
parent
c9dae2684f
commit
73f0db2aed
|
@ -342,30 +342,3 @@ DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, bool isPostproces
|
||||||
return objectAddresses;
|
return objectAddresses;
|
||||||
}
|
}
|
||||||
|
|
||||||
void UnmarkRolesAndDatabaseDistributed(Node *node)
|
|
||||||
{
|
|
||||||
if (IsA(node, DropRoleStmt))
|
|
||||||
{
|
|
||||||
DropRoleStmt *stmt = castNode(DropRoleStmt, node);
|
|
||||||
List *allDropRoles = stmt->roles;
|
|
||||||
|
|
||||||
List *distributedDropRoles = FilterDistributedRoles(allDropRoles);
|
|
||||||
if (list_length(distributedDropRoles) > 0)
|
|
||||||
{
|
|
||||||
UnmarkRolesDistributed(distributedDropRoles);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
else if (IsA(node, DropdbStmt))
|
|
||||||
{
|
|
||||||
elog(LOG, "Unmarking database1 as distributed");
|
|
||||||
DropdbStmt *stmt = castNode(DropdbStmt, node);
|
|
||||||
char *dbName = stmt->dbname;
|
|
||||||
|
|
||||||
Oid dbOid = get_database_oid(dbName, stmt->missing_ok);
|
|
||||||
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress));
|
|
||||||
ObjectAddressSet(*dbAddress, DatabaseRelationId, dbOid);
|
|
||||||
UnmarkObjectDistributed(dbAddress);
|
|
||||||
elog(LOG, "Unmarking database %s as distributed", dbName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -273,16 +273,11 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
|
||||||
|
|
||||||
char *createDatabaseCommand = DeparseTreeNode(node);
|
char *createDatabaseCommand = DeparseTreeNode(node);
|
||||||
|
|
||||||
StringInfo internalCreateCommand = makeStringInfo();
|
|
||||||
appendStringInfo(internalCreateCommand,
|
|
||||||
"SELECT pg_catalog.citus_internal_database_command(%s)",
|
|
||||||
quote_literal_cstr(createDatabaseCommand));
|
|
||||||
|
|
||||||
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||||
(void *) internalCreateCommand->data,
|
(void *) createDatabaseCommand,
|
||||||
ENABLE_DDL_PROPAGATION);
|
ENABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -356,55 +351,59 @@ List *
|
||||||
PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
||||||
ProcessUtilityContext processUtilityContext)
|
ProcessUtilityContext processUtilityContext)
|
||||||
{
|
{
|
||||||
|
bool isPostProcess = false;
|
||||||
if (!EnableCreateDatabasePropagation || !ShouldPropagate())
|
if (!EnableCreateDatabasePropagation || !ShouldPropagate())
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
EnsureSequentialModeForRoleDDL();
|
|
||||||
|
|
||||||
DropdbStmt *stmt = (DropdbStmt *) node;
|
DropdbStmt *stmt = (DropdbStmt *) node;
|
||||||
|
|
||||||
Oid databaseOid = get_database_oid(stmt->dbname, stmt->missing_ok);
|
List *addresses = GetObjectAddressListFromParseTree(node, stmt->missing_ok, isPostProcess);
|
||||||
|
|
||||||
if (databaseOid == InvalidOid)
|
if (list_length(addresses) == 0)
|
||||||
{
|
{
|
||||||
/* let regular ProcessUtility deal with IF NOT EXISTS */
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ObjectAddress *address = (ObjectAddress *) linitial(addresses);
|
||||||
ObjectAddress dbAddress = { 0 };
|
if (address->objectId == InvalidOid ||!IsObjectDistributed(address))
|
||||||
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
|
|
||||||
if (!IsObjectDistributed(&dbAddress))
|
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *dropDatabaseCommand = DeparseTreeNode(node);
|
char *dropDatabaseCommand = DeparseTreeNode(node);
|
||||||
|
|
||||||
StringInfo internalDropCommand = makeStringInfo();
|
|
||||||
appendStringInfo(internalDropCommand,
|
|
||||||
"SELECT pg_catalog.citus_internal_database_command(%s)",
|
|
||||||
quote_literal_cstr(dropDatabaseCommand));
|
|
||||||
|
|
||||||
|
|
||||||
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||||
(void *) internalDropCommand->data,
|
(void *) dropDatabaseCommand,
|
||||||
ENABLE_DDL_PROPAGATION);
|
ENABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static ObjectAddress *GetDatabaseAddressFromDatabaseName(char *databaseName)
|
||||||
|
{
|
||||||
|
Oid databaseOid = get_database_oid(databaseName, false);
|
||||||
|
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress));
|
||||||
|
ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid);
|
||||||
|
return dbAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
List *
|
||||||
|
DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
|
||||||
|
{
|
||||||
|
DropdbStmt *stmt = castNode(DropdbStmt, node);
|
||||||
|
ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname);
|
||||||
|
return list_make1(dbAddress);
|
||||||
|
}
|
||||||
|
|
||||||
List *
|
List *
|
||||||
CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
|
CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
|
||||||
{
|
{
|
||||||
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
|
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
|
||||||
Oid databaseOid = get_database_oid(stmt->dbname, missing_ok);
|
ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname);
|
||||||
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress));
|
|
||||||
ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid);
|
|
||||||
|
|
||||||
return list_make1(dbAddress);
|
return list_make1(dbAddress);
|
||||||
}
|
}
|
||||||
|
|
|
@ -484,7 +484,7 @@ static DistributeObjectOps Database_Drop = {
|
||||||
.postprocess = NULL,
|
.postprocess = NULL,
|
||||||
.objectType = OBJECT_DATABASE,
|
.objectType = OBJECT_DATABASE,
|
||||||
.operationType = DIST_OPS_DROP,
|
.operationType = DIST_OPS_DROP,
|
||||||
.address = NULL,
|
.address = DropDatabaseStmtObjectAddress,
|
||||||
.markDistributed = false,
|
.markDistributed = false,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -729,7 +729,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make sure that dropping the role and database deletes the pg_dist_object entries. There is a
|
* Make sure that dropping the role and database deletes the pg_dist_object entries. There is a
|
||||||
* separate logic for roles and database, since roles database are not included as dropped objects in the
|
* separate logic for roles and database, since roles and database are not included as dropped objects in the
|
||||||
* drop event trigger. To handle it both on worker and coordinator nodes, it is not
|
* drop event trigger. To handle it both on worker and coordinator nodes, it is not
|
||||||
* implemented as a part of process functions but here.
|
* implemented as a part of process functions but here.
|
||||||
*/
|
*/
|
||||||
|
@ -1482,6 +1482,24 @@ DDLTaskList(Oid relationId, const char *commandString)
|
||||||
return taskList;
|
return taskList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* NontransactionalNodeDDLTask builds a list of tasks to execute a DDL command on a
|
||||||
|
* given target set of nodes with cannotBeExecutedInTransction is set to make sure
|
||||||
|
* that list is being executed without a transaction.
|
||||||
|
*/
|
||||||
|
List * NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands ){
|
||||||
|
List *ddlJobs = NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||||
|
DDLJob *ddlJob = NULL;
|
||||||
|
foreach_ptr(ddlJob, ddlJobs)
|
||||||
|
{
|
||||||
|
Task *task = NULL;
|
||||||
|
foreach_ptr(task, ddlJob->taskList)
|
||||||
|
{
|
||||||
|
task->cannotBeExecutedInTransction = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ddlJobs;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NodeDDLTaskList builds a list of tasks to execute a DDL command on a
|
* NodeDDLTaskList builds a list of tasks to execute a DDL command on a
|
||||||
|
|
|
@ -48,6 +48,8 @@
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/regproc.h"
|
#include "utils/regproc.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
#include "catalog/pg_database.h"
|
||||||
|
#include "commands/dbcommands.h"
|
||||||
|
|
||||||
|
|
||||||
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
||||||
|
@ -355,6 +357,31 @@ ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
||||||
return spiStatus;
|
return spiStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void UnmarkRolesAndDatabaseDistributed(Node *node)
|
||||||
|
{
|
||||||
|
if (IsA(node, DropRoleStmt))
|
||||||
|
{
|
||||||
|
DropRoleStmt *stmt = castNode(DropRoleStmt, node);
|
||||||
|
List *allDropRoles = stmt->roles;
|
||||||
|
|
||||||
|
List *distributedDropRoles = FilterDistributedRoles(allDropRoles);
|
||||||
|
if (list_length(distributedDropRoles) > 0)
|
||||||
|
{
|
||||||
|
UnmarkRolesDistributed(distributedDropRoles);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
else if (IsA(node, DropdbStmt))
|
||||||
|
{
|
||||||
|
DropdbStmt *stmt = castNode(DropdbStmt, node);
|
||||||
|
char *dbName = stmt->dbname;
|
||||||
|
|
||||||
|
Oid dbOid = get_database_oid(dbName, stmt->missing_ok);
|
||||||
|
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress));
|
||||||
|
ObjectAddressSet(*dbAddress, DatabaseRelationId, dbOid);
|
||||||
|
UnmarkObjectDistributed(dbAddress);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UnmarkObjectDistributed removes the entry from pg_dist_object that marks this object as
|
* UnmarkObjectDistributed removes the entry from pg_dist_object that marks this object as
|
||||||
|
|
|
@ -193,7 +193,6 @@ extern List * DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, boo
|
||||||
isPostprocess);
|
isPostprocess);
|
||||||
extern List * DropTextSearchDictObjectAddress(Node *node, bool missing_ok, bool
|
extern List * DropTextSearchDictObjectAddress(Node *node, bool missing_ok, bool
|
||||||
isPostprocess);
|
isPostprocess);
|
||||||
extern void UnmarkRolesAndDatabaseDistributed(Node *node);
|
|
||||||
|
|
||||||
/* index.c */
|
/* index.c */
|
||||||
typedef void (*PGIndexProcessor)(Form_pg_index, List **, int);
|
typedef void (*PGIndexProcessor)(Form_pg_index, List **, int);
|
||||||
|
@ -241,6 +240,8 @@ extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString
|
||||||
extern List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString);
|
extern List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString);
|
||||||
extern List * PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
extern List * PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
||||||
ProcessUtilityContext processUtilityContext);
|
ProcessUtilityContext processUtilityContext);
|
||||||
|
extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess);
|
||||||
|
extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess);
|
||||||
|
|
||||||
|
|
||||||
/* domain.c - forward declarations */
|
/* domain.c - forward declarations */
|
||||||
|
|
|
@ -94,6 +94,7 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString,
|
||||||
extern void MarkInvalidateForeignKeyGraph(void);
|
extern void MarkInvalidateForeignKeyGraph(void);
|
||||||
extern void InvalidateForeignKeyGraphForDDL(void);
|
extern void InvalidateForeignKeyGraphForDDL(void);
|
||||||
extern List * DDLTaskList(Oid relationId, const char *commandString);
|
extern List * DDLTaskList(Oid relationId, const char *commandString);
|
||||||
|
extern List * NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands );
|
||||||
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
|
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
|
||||||
extern bool AlterTableInProgress(void);
|
extern bool AlterTableInProgress(void);
|
||||||
extern bool DropSchemaOrDBInProgress(void);
|
extern bool DropSchemaOrDBInProgress(void);
|
||||||
|
|
|
@ -27,6 +27,7 @@ extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
||||||
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
||||||
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
||||||
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
||||||
|
extern void UnmarkRolesAndDatabaseDistributed(Node *node);
|
||||||
extern bool IsTableOwnedByExtension(Oid relationId);
|
extern bool IsTableOwnedByExtension(Oid relationId);
|
||||||
extern bool ObjectAddressDependsOnExtension(const ObjectAddress *target);
|
extern bool ObjectAddressDependsOnExtension(const ObjectAddress *target);
|
||||||
extern bool IsAnyObjectAddressOwnedByExtension(const List *targets,
|
extern bool IsAnyObjectAddressOwnedByExtension(const List *targets,
|
||||||
|
|
Loading…
Reference in New Issue