mirror of https://github.com/citusdata/citus.git
Adds first commit
parent
0dca65c84d
commit
226696e42a
|
@ -18,7 +18,7 @@ generated_downgrade_sql_files += $(patsubst %,$(citus_abs_srcdir)/build/sql/%,$(
|
||||||
DATA_built = $(generated_sql_files)
|
DATA_built = $(generated_sql_files)
|
||||||
|
|
||||||
# directories with source files
|
# directories with source files
|
||||||
SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock
|
SUBDIRS = . commands connection database ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock
|
||||||
# enterprise modules
|
# enterprise modules
|
||||||
SUBDIRS += replication
|
SUBDIRS += replication
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "nodes/parsenodes.h"
|
#include "nodes/parsenodes.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
#include "utils/builtins.h"
|
||||||
|
|
||||||
#include "distributed/commands.h"
|
#include "distributed/commands.h"
|
||||||
#include "distributed/commands/utility_hook.h"
|
#include "distributed/commands/utility_hook.h"
|
||||||
|
@ -28,13 +29,42 @@
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/relation_access_tracking.h"
|
#include "distributed/relation_access_tracking.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
|
#include "distributed/deparser.h"
|
||||||
|
#include "distributed/worker_protocol.h"
|
||||||
|
#include "distributed/metadata/distobject.h"
|
||||||
|
#include "distributed/database/database_sharding.h"
|
||||||
|
#include "distributed/deparse_shard_query.h"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/* macros to add DefElems to a list */
|
||||||
|
#define DEFELEM_ADD_STRING(options, key, value) { \
|
||||||
|
DefElem *elem = makeDefElem(key, (Node *) makeString(value), -1); \
|
||||||
|
options = lappend(options, elem); \
|
||||||
|
}
|
||||||
|
|
||||||
|
#define DEFELEM_ADD_BOOL(options, key, value) { \
|
||||||
|
DefElem *elem = makeDefElem(key, (Node *) makeBoolean(value), -1); \
|
||||||
|
options = lappend(options, elem); \
|
||||||
|
}
|
||||||
|
|
||||||
|
#define DEFELEM_ADD_INT(options, key, value) { \
|
||||||
|
DefElem *elem = makeDefElem(key, (Node *) makeInteger(value), -1); \
|
||||||
|
options = lappend(options, elem); \
|
||||||
|
}
|
||||||
|
|
||||||
static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
|
static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
|
||||||
|
|
||||||
|
static List * CreateDDLTaskList(char *command, List *workerNodeList,
|
||||||
|
bool outsideTransaction);
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(citus_internal_database_command);
|
||||||
static Oid get_database_owner(Oid db_oid);
|
static Oid get_database_owner(Oid db_oid);
|
||||||
List * PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
|
List * PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
|
||||||
ProcessUtilityContext processUtilityContext);
|
ProcessUtilityContext processUtilityContext);
|
||||||
|
|
||||||
/* controlled via GUC */
|
/* controlled via GUC */
|
||||||
|
bool EnableCreateDatabasePropagation = true;
|
||||||
bool EnableAlterDatabaseOwner = true;
|
bool EnableAlterDatabaseOwner = true;
|
||||||
|
|
||||||
|
|
||||||
|
@ -148,6 +178,78 @@ PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
|
||||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_internal_database_command is an internal UDF to
|
||||||
|
* create/drop a database in an idempotent maner without
|
||||||
|
* transaction block restrictions.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_internal_database_command(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
text *commandText = PG_GETARG_TEXT_P(0);
|
||||||
|
char *command = text_to_cstring(commandText);
|
||||||
|
Node *parseTree = ParseTreeNode(command);
|
||||||
|
|
||||||
|
set_config_option("citus.enable_ddl_propagation", "off",
|
||||||
|
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
||||||
|
GUC_ACTION_LOCAL, true, 0, false);
|
||||||
|
|
||||||
|
set_config_option("citus.enable_create_database_propagation", "off",
|
||||||
|
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
||||||
|
GUC_ACTION_LOCAL, true, 0, false);
|
||||||
|
|
||||||
|
if (IsA(parseTree, CreatedbStmt))
|
||||||
|
{
|
||||||
|
CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree);
|
||||||
|
|
||||||
|
bool missingOk = true;
|
||||||
|
Oid databaseOid = get_database_oid(stmt->dbname, missingOk);
|
||||||
|
|
||||||
|
if (!OidIsValid(databaseOid))
|
||||||
|
{
|
||||||
|
createdb(NULL, (CreatedbStmt *) parseTree);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* TODO: check database properties */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (IsA(parseTree, DropdbStmt))
|
||||||
|
{
|
||||||
|
DropdbStmt *stmt = castNode(DropdbStmt, parseTree);
|
||||||
|
|
||||||
|
bool missingOk = true;
|
||||||
|
Oid databaseOid = get_database_oid(stmt->dbname, missingOk);
|
||||||
|
|
||||||
|
if (!OidIsValid(databaseOid))
|
||||||
|
{
|
||||||
|
/* already dropped? */
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* remove database from pg_dist_object */
|
||||||
|
ObjectAddress dbAddress = { 0 };
|
||||||
|
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
|
||||||
|
|
||||||
|
if (IsObjectDistributed(&dbAddress))
|
||||||
|
{
|
||||||
|
UnmarkObjectDistributed(&dbAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* remove database from database shards */
|
||||||
|
DeleteDatabaseShardByDatabaseIdLocally(databaseOid);
|
||||||
|
|
||||||
|
DropDatabase(NULL, (DropdbStmt *) parseTree);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("unsupported command type %d", nodeTag(parseTree))));
|
||||||
|
}
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PreprocessAlterDatabaseStmt is executed before the statement is applied to the local
|
* PreprocessAlterDatabaseStmt is executed before the statement is applied to the local
|
||||||
|
@ -213,6 +315,35 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateDDLTaskList creates a task list for running a single DDL command.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
CreateDDLTaskList(char *command, List *workerNodeList, bool outsideTransaction)
|
||||||
|
{
|
||||||
|
List *commandList = list_make3(DISABLE_DDL_PROPAGATION,
|
||||||
|
command,
|
||||||
|
ENABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
|
Task *task = CitusMakeNode(Task);
|
||||||
|
task->taskType = DDL_TASK;
|
||||||
|
SetTaskQueryStringList(task, commandList);
|
||||||
|
task->cannotBeExecutedInTransction = outsideTransaction;
|
||||||
|
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
foreach_ptr(workerNode, workerNodeList)
|
||||||
|
{
|
||||||
|
ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
|
||||||
|
targetPlacement->nodeName = workerNode->workerName;
|
||||||
|
targetPlacement->nodePort = workerNode->workerPort;
|
||||||
|
targetPlacement->groupId = workerNode->groupId;
|
||||||
|
task->taskPlacementList = lappend(task->taskPlacementList,
|
||||||
|
targetPlacement);
|
||||||
|
}
|
||||||
|
|
||||||
|
return list_make1(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local
|
* PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local
|
||||||
|
@ -242,3 +373,111 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
|
||||||
|
|
||||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PostprocessCreatedbStmt creates the plan to synchronize CREATE DATABASE
|
||||||
|
* across nodes. We use the cannotBeExecutedInTransction option to avoid
|
||||||
|
* u* sending transaction blocks.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
|
||||||
|
{
|
||||||
|
if (!EnableCreateDatabasePropagation || !ShouldPropagate())
|
||||||
|
{
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
|
||||||
|
char *databaseName = stmt->dbname;
|
||||||
|
bool missingOk = false;
|
||||||
|
Oid databaseOid = get_database_oid(databaseName, missingOk);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TODO: try to reuse regular DDL infrastructure
|
||||||
|
*
|
||||||
|
* We do not do this right now because of the AssignDatabaseToShard at the end.
|
||||||
|
*/
|
||||||
|
List *workerNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, RowShareLock);
|
||||||
|
if (list_length(workerNodes) > 0)
|
||||||
|
{
|
||||||
|
char *createDatabaseCommand = DeparseTreeNode(node);
|
||||||
|
|
||||||
|
StringInfo internalCreateCommand = makeStringInfo();
|
||||||
|
appendStringInfo(internalCreateCommand,
|
||||||
|
"SELECT pg_catalog.citus_internal_database_command(%s)",
|
||||||
|
quote_literal_cstr(createDatabaseCommand));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* For the moment, we run CREATE DATABASE in 2PC, though that prevents
|
||||||
|
* us from immediately doing a pg_dump | pg_restore when dealing with
|
||||||
|
* a remote template database.
|
||||||
|
*/
|
||||||
|
bool outsideTransaction = false;
|
||||||
|
|
||||||
|
List *taskList = CreateDDLTaskList(internalCreateCommand->data, workerNodes,
|
||||||
|
outsideTransaction);
|
||||||
|
|
||||||
|
bool localExecutionSupported = false;
|
||||||
|
ExecuteUtilityTaskList(taskList, localExecutionSupported);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* synchronize pg_dist_object records */
|
||||||
|
ObjectAddress dbAddress = { 0 };
|
||||||
|
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
|
||||||
|
MarkObjectDistributed(&dbAddress);
|
||||||
|
|
||||||
|
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
List *
|
||||||
|
PreprocessDropDatabaseStmt(Node *node, const char *queryString,
|
||||||
|
ProcessUtilityContext processUtilityContext)
|
||||||
|
{
|
||||||
|
if (!EnableCreateDatabasePropagation || !ShouldPropagate())
|
||||||
|
{
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
DropdbStmt *stmt = (DropdbStmt *) node;
|
||||||
|
char *databaseName = stmt->dbname;
|
||||||
|
bool missingOk = true;
|
||||||
|
Oid databaseOid = get_database_oid(databaseName, missingOk);
|
||||||
|
if (databaseOid == InvalidOid)
|
||||||
|
{
|
||||||
|
/* let regular ProcessUtility deal with IF NOT EXISTS */
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjectAddress dbAddress = { 0 };
|
||||||
|
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
|
||||||
|
if (!IsObjectDistributed(&dbAddress))
|
||||||
|
{
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
List *workerNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, RowShareLock);
|
||||||
|
if (list_length(workerNodes) == 0)
|
||||||
|
{
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *dropDatabaseCommand = DeparseTreeNode(node);
|
||||||
|
|
||||||
|
StringInfo internalDropCommand = makeStringInfo();
|
||||||
|
appendStringInfo(internalDropCommand,
|
||||||
|
"SELECT pg_catalog.citus_internal_database_command(%s)",
|
||||||
|
quote_literal_cstr(dropDatabaseCommand));
|
||||||
|
|
||||||
|
|
||||||
|
/* we execute here to avoid EnsureCoordinator check in ExecuteDistributedDDLJob */
|
||||||
|
bool outsideTransaction = false;
|
||||||
|
List *taskList = CreateDDLTaskList(internalDropCommand->data, workerNodes,
|
||||||
|
outsideTransaction);
|
||||||
|
|
||||||
|
bool localExecutionSupported = false;
|
||||||
|
ExecuteUtilityTaskList(taskList, localExecutionSupported);
|
||||||
|
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
|
@ -466,6 +466,28 @@ static DistributeObjectOps Database_Alter = {
|
||||||
.markDistributed = false,
|
.markDistributed = false,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static DistributeObjectOps Database_Create = {
|
||||||
|
.deparse = DeparseCreateDatabaseStmt,
|
||||||
|
.qualify = NULL,
|
||||||
|
.preprocess = NULL,
|
||||||
|
.postprocess = PostprocessCreateDatabaseStmt,
|
||||||
|
.objectType = OBJECT_DATABASE,
|
||||||
|
.operationType = DIST_OPS_CREATE,
|
||||||
|
.address = NULL,
|
||||||
|
.markDistributed = false,
|
||||||
|
};
|
||||||
|
|
||||||
|
static DistributeObjectOps Database_Drop = {
|
||||||
|
.deparse = DeparseDropDatabaseStmt,
|
||||||
|
.qualify = NULL,
|
||||||
|
.preprocess = PreprocessDropDatabaseStmt,
|
||||||
|
.postprocess = NULL,
|
||||||
|
.objectType = OBJECT_DATABASE,
|
||||||
|
.operationType = DIST_OPS_DROP,
|
||||||
|
.address = NULL,
|
||||||
|
.markDistributed = false,
|
||||||
|
};
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
static DistributeObjectOps Database_RefreshColl = {
|
static DistributeObjectOps Database_RefreshColl = {
|
||||||
.deparse = DeparseAlterDatabaseRefreshCollStmt,
|
.deparse = DeparseAlterDatabaseRefreshCollStmt,
|
||||||
|
@ -1333,6 +1355,15 @@ GetDistributeObjectOps(Node *node)
|
||||||
{
|
{
|
||||||
return &Database_Alter;
|
return &Database_Alter;
|
||||||
}
|
}
|
||||||
|
case T_CreatedbStmt:
|
||||||
|
{
|
||||||
|
return &Database_Create;
|
||||||
|
}
|
||||||
|
|
||||||
|
case T_DropdbStmt:
|
||||||
|
{
|
||||||
|
return &Database_Drop;
|
||||||
|
}
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
case T_AlterDatabaseRefreshCollStmt:
|
case T_AlterDatabaseRefreshCollStmt:
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
/*
|
||||||
|
* database_sharding.h
|
||||||
|
*
|
||||||
|
* Data structure definition for managing backend data and related function
|
||||||
|
*
|
||||||
|
* Copyright (c) Microsoft, Inc.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef DATABASE_SHARDING_H
|
||||||
|
#define DATABASE_SHARDING_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "tcop/utility.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* attributes of citus_catalog.database_shard */
|
||||||
|
#define Natts_database_shard 3
|
||||||
|
#define Anum_database_shard_database_id 1
|
||||||
|
#define Anum_database_shard_node_group_id 2
|
||||||
|
#define Anum_database_shard_is_available 3
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct DatabaseShard
|
||||||
|
{
|
||||||
|
/* database oid */
|
||||||
|
Oid databaseOid;
|
||||||
|
|
||||||
|
/* node group on which the database shard is placed */
|
||||||
|
int nodeGroupId;
|
||||||
|
|
||||||
|
/* whether the database shard is available */
|
||||||
|
bool isAvailable;
|
||||||
|
} DatabaseShard;
|
||||||
|
|
||||||
|
/* citus.enable_database_sharding setting */
|
||||||
|
extern bool EnableDatabaseSharding;
|
||||||
|
|
||||||
|
void PreProcessUtilityInDatabaseShard(Node *parseTree, const char *queryString,
|
||||||
|
ProcessUtilityContext context,
|
||||||
|
bool *runPreviousUtilityHook);
|
||||||
|
void PostProcessUtilityInDatabaseShard(Node *parseTree, const char *queryString,
|
||||||
|
ProcessUtilityContext context);
|
||||||
|
bool DatabaseShardingEnabled(void);
|
||||||
|
void AssignDatabaseToShard(Oid databaseOid);
|
||||||
|
void UpdateDatabaseShard(Oid databaseOid, int targetNodeGroupId);
|
||||||
|
void DeleteDatabaseShardByDatabaseIdLocally(Oid databaseOid);
|
||||||
|
DatabaseShard * GetDatabaseShardByOid(Oid databaseOid);
|
||||||
|
List * ListDatabaseShards(void);
|
||||||
|
int64 CitusDatabaseSize(Oid databaseId);
|
||||||
|
char * InsertDatabaseShardAssignmentCommand(Oid databaseOid, int nodeGroupId);
|
||||||
|
|
||||||
|
|
||||||
|
#endif
|
|
@ -24,7 +24,7 @@
|
||||||
#include "distributed/deparser.h"
|
#include "distributed/deparser.h"
|
||||||
#include "distributed/log_utils.h"
|
#include "distributed/log_utils.h"
|
||||||
#include "parser/parse_type.h"
|
#include "parser/parse_type.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
|
|
||||||
static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
|
static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
|
||||||
static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt);
|
static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt);
|
||||||
|
@ -34,7 +34,7 @@ char *
|
||||||
DeparseAlterDatabaseOwnerStmt(Node *node)
|
DeparseAlterDatabaseOwnerStmt(Node *node)
|
||||||
{
|
{
|
||||||
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
|
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
|
||||||
StringInfoData str = { 0 };
|
StringInfoData str = {0};
|
||||||
initStringInfo(&str);
|
initStringInfo(&str);
|
||||||
|
|
||||||
Assert(stmt->objectType == OBJECT_DATABASE);
|
Assert(stmt->objectType == OBJECT_DATABASE);
|
||||||
|
@ -44,7 +44,6 @@ DeparseAlterDatabaseOwnerStmt(Node *node)
|
||||||
return str.data;
|
return str.data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
|
AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
|
||||||
{
|
{
|
||||||
|
@ -52,18 +51,17 @@ AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
|
||||||
|
|
||||||
appendStringInfo(buf,
|
appendStringInfo(buf,
|
||||||
"ALTER DATABASE %s OWNER TO %s;",
|
"ALTER DATABASE %s OWNER TO %s;",
|
||||||
quote_identifier(strVal((String *) stmt->object)),
|
quote_identifier(strVal((String *)stmt->object)),
|
||||||
RoleSpecString(stmt->newowner, true));
|
RoleSpecString(stmt->newowner, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
AppendGrantDatabases(StringInfo buf, GrantStmt *stmt)
|
AppendGrantDatabases(StringInfo buf, GrantStmt *stmt)
|
||||||
{
|
{
|
||||||
ListCell *cell = NULL;
|
ListCell *cell = NULL;
|
||||||
appendStringInfo(buf, " ON DATABASE ");
|
appendStringInfo(buf, " ON DATABASE ");
|
||||||
|
|
||||||
foreach(cell, stmt->objects)
|
foreach (cell, stmt->objects)
|
||||||
{
|
{
|
||||||
char *database = strVal(lfirst(cell));
|
char *database = strVal(lfirst(cell));
|
||||||
appendStringInfoString(buf, quote_identifier(database));
|
appendStringInfoString(buf, quote_identifier(database));
|
||||||
|
@ -74,7 +72,6 @@ AppendGrantDatabases(StringInfo buf, GrantStmt *stmt)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt)
|
AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt)
|
||||||
{
|
{
|
||||||
|
@ -87,14 +84,12 @@ AppendGrantOnDatabaseStmt(StringInfo buf, GrantStmt *stmt)
|
||||||
AppendGrantSharedSuffix(buf, stmt);
|
AppendGrantSharedSuffix(buf, stmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
AppendDefElemConnLimit(StringInfo buf, DefElem *def)
|
AppendDefElemConnLimit(StringInfo buf, DefElem *def)
|
||||||
{
|
{
|
||||||
appendStringInfo(buf, " CONNECTION LIMIT %ld", (long int) defGetNumeric(def));
|
appendStringInfo(buf, " CONNECTION LIMIT %ld", (long int)defGetNumeric(def));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
|
AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
|
||||||
{
|
{
|
||||||
|
@ -104,7 +99,7 @@ AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
|
||||||
{
|
{
|
||||||
ListCell *cell = NULL;
|
ListCell *cell = NULL;
|
||||||
appendStringInfo(buf, "WITH ");
|
appendStringInfo(buf, "WITH ");
|
||||||
foreach(cell, stmt->options)
|
foreach (cell, stmt->options)
|
||||||
{
|
{
|
||||||
DefElem *def = castNode(DefElem, lfirst(cell));
|
DefElem *def = castNode(DefElem, lfirst(cell));
|
||||||
if (strcmp(def->defname, "is_template") == 0)
|
if (strcmp(def->defname, "is_template") == 0)
|
||||||
|
@ -133,14 +128,13 @@ AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt)
|
||||||
appendStringInfo(buf, ";");
|
appendStringInfo(buf, ";");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
char *
|
char *
|
||||||
DeparseGrantOnDatabaseStmt(Node *node)
|
DeparseGrantOnDatabaseStmt(Node *node)
|
||||||
{
|
{
|
||||||
GrantStmt *stmt = castNode(GrantStmt, node);
|
GrantStmt *stmt = castNode(GrantStmt, node);
|
||||||
Assert(stmt->objtype == OBJECT_DATABASE);
|
Assert(stmt->objtype == OBJECT_DATABASE);
|
||||||
|
|
||||||
StringInfoData str = { 0 };
|
StringInfoData str = {0};
|
||||||
initStringInfo(&str);
|
initStringInfo(&str);
|
||||||
|
|
||||||
AppendGrantOnDatabaseStmt(&str, stmt);
|
AppendGrantOnDatabaseStmt(&str, stmt);
|
||||||
|
@ -148,13 +142,12 @@ DeparseGrantOnDatabaseStmt(Node *node)
|
||||||
return str.data;
|
return str.data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
char *
|
char *
|
||||||
DeparseAlterDatabaseStmt(Node *node)
|
DeparseAlterDatabaseStmt(Node *node)
|
||||||
{
|
{
|
||||||
AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node);
|
AlterDatabaseStmt *stmt = castNode(AlterDatabaseStmt, node);
|
||||||
|
|
||||||
StringInfoData str = { 0 };
|
StringInfoData str = {0};
|
||||||
initStringInfo(&str);
|
initStringInfo(&str);
|
||||||
|
|
||||||
AppendAlterDatabaseStmt(&str, stmt);
|
AppendAlterDatabaseStmt(&str, stmt);
|
||||||
|
@ -162,12 +155,11 @@ DeparseAlterDatabaseStmt(Node *node)
|
||||||
return str.data;
|
return str.data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
char *
|
char *
|
||||||
DeparseAlterDatabaseRefreshCollStmt(Node *node)
|
DeparseAlterDatabaseRefreshCollStmt(Node *node)
|
||||||
{
|
{
|
||||||
AlterDatabaseRefreshCollStmt *stmt = (AlterDatabaseRefreshCollStmt *) node;
|
AlterDatabaseRefreshCollStmt *stmt = (AlterDatabaseRefreshCollStmt *)node;
|
||||||
|
|
||||||
StringInfoData str;
|
StringInfoData str;
|
||||||
initStringInfo(&str);
|
initStringInfo(&str);
|
||||||
|
@ -179,7 +171,6 @@ DeparseAlterDatabaseRefreshCollStmt(Node *node)
|
||||||
return str.data;
|
return str.data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -192,16 +183,193 @@ AppendAlterDatabaseSetStmt(StringInfo buf, AlterDatabaseSetStmt *stmt)
|
||||||
AppendVariableSet(buf, varSetStmt);
|
AppendVariableSet(buf, varSetStmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
char *
|
char *
|
||||||
DeparseAlterDatabaseSetStmt(Node *node)
|
DeparseAlterDatabaseSetStmt(Node *node)
|
||||||
{
|
{
|
||||||
AlterDatabaseSetStmt *stmt = castNode(AlterDatabaseSetStmt, node);
|
AlterDatabaseSetStmt *stmt = castNode(AlterDatabaseSetStmt, node);
|
||||||
|
|
||||||
StringInfoData str = { 0 };
|
StringInfoData str = {0};
|
||||||
initStringInfo(&str);
|
initStringInfo(&str);
|
||||||
|
|
||||||
AppendAlterDatabaseSetStmt(&str, stmt);
|
AppendAlterDatabaseSetStmt(&str, stmt);
|
||||||
|
|
||||||
return str.data;
|
return str.data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char *
|
||||||
|
DeparseCreateDatabaseSetStmt(Node *node)
|
||||||
|
{
|
||||||
|
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
|
||||||
|
StringInfoData str = {0};
|
||||||
|
initStringInfo(&str);
|
||||||
|
|
||||||
|
AppendCreatedbStmt(&str, stmt);
|
||||||
|
|
||||||
|
return str.data;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
AppendCreatedbStmt(StringInfo buf, CreatedbStmt *stmt)
|
||||||
|
{
|
||||||
|
appendStringInfo(buf,
|
||||||
|
"CREATE DATABASE %s",
|
||||||
|
quote_identifier(stmt->dbname));
|
||||||
|
|
||||||
|
DefElem *option = NULL;
|
||||||
|
|
||||||
|
foreach_ptr(option, stmt->options)
|
||||||
|
{
|
||||||
|
if (strcmp(option->defname, "tablespace") == 0)
|
||||||
|
{
|
||||||
|
char *tablespaceName = defGetString(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " TABLESPACE %s",
|
||||||
|
quote_identifier(tablespaceName));
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "owner") == 0)
|
||||||
|
{
|
||||||
|
char *owner = defGetString(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " OWNER %s",
|
||||||
|
quote_identifier(owner));
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "template") == 0)
|
||||||
|
{
|
||||||
|
char *template = defGetString(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " TEMPLATE %s",
|
||||||
|
quote_identifier(template));
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "encoding") == 0)
|
||||||
|
{
|
||||||
|
char *encoding = defGetString(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " ENCODING %s",
|
||||||
|
quote_literal_cstr(encoding));
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "locale") == 0)
|
||||||
|
{
|
||||||
|
char *locale = defGetString(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " LOCALE %s",
|
||||||
|
quote_literal_cstr(locale));
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "lc_collate") == 0)
|
||||||
|
{
|
||||||
|
char *lc_collate = defGetString(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " LC_COLLATE %s",
|
||||||
|
quote_literal_cstr(lc_collate));
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "lc_ctype") == 0)
|
||||||
|
{
|
||||||
|
char *lc_ctype = defGetString(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " LC_CTYPE %s",
|
||||||
|
quote_literal_cstr(lc_ctype));
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "icu_locale") == 0)
|
||||||
|
{
|
||||||
|
char *icuLocale = defGetString(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " ICU_LOCALE %s",
|
||||||
|
quote_literal_cstr(icuLocale));
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "locale_provider") == 0)
|
||||||
|
{
|
||||||
|
char *localeProvider = defGetString(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " LOCALE_PROVIDER %s",
|
||||||
|
quote_literal_cstr(localeProvider));
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "is_template") == 0)
|
||||||
|
{
|
||||||
|
bool isTemplate = defGetBoolean(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " IS_TEMPLATE %s",
|
||||||
|
isTemplate ? "true" : "false");
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "allow_connections") == 0)
|
||||||
|
{
|
||||||
|
bool allowConnections = defGetBoolean(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " ALLOW_CONNECTIONS %s",
|
||||||
|
allowConnections ? "true" : "false");
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "connection_limit") == 0)
|
||||||
|
{
|
||||||
|
int connectionLimit = defGetInt32(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " CONNECTION_LIMIT %d", connectionLimit);
|
||||||
|
}
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
|
else if (strcmp(option->defname, "collation_version") == 0)
|
||||||
|
{
|
||||||
|
char *collationVersion = defGetString(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " COLLATION_VERSION %s",
|
||||||
|
quote_literal_cstr(collationVersion));
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "oid") == 0)
|
||||||
|
{
|
||||||
|
Oid objectId = defGetObjectId(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " OID %d", objectId);
|
||||||
|
}
|
||||||
|
else if (strcmp(option->defname, "strategy") == 0)
|
||||||
|
{
|
||||||
|
char *strategy = defGetString(option);
|
||||||
|
|
||||||
|
appendStringInfo(buf, " STRATEGY %s",
|
||||||
|
quote_literal_cstr(strategy));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
else if (strcmp(option->defname, "location") == 0)
|
||||||
|
{
|
||||||
|
/* deprecated option */
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
|
||||||
|
errmsg("unrecognized CREATE DATABASE option \"%s\"",
|
||||||
|
option->defname)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char *
|
||||||
|
DeparseDropDatabaseStmt(Node *node)
|
||||||
|
{
|
||||||
|
DropdbStmt *stmt = castNode(DropdbStmt, node);
|
||||||
|
StringInfoData str = { 0 };
|
||||||
|
initStringInfo(&str);
|
||||||
|
|
||||||
|
AppendDropDatabaseStmt(&str, stmt);
|
||||||
|
|
||||||
|
return str.data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt)
|
||||||
|
{
|
||||||
|
appendStringInfo(buf,
|
||||||
|
"DROP DATABASE %s",
|
||||||
|
quote_identifier(stmt->dbname));
|
||||||
|
|
||||||
|
DefElem *option = NULL;
|
||||||
|
|
||||||
|
foreach_ptr(option, stmt->options)
|
||||||
|
{
|
||||||
|
if (strcmp(option->defname, "force") == 0)
|
||||||
|
{
|
||||||
|
appendStringInfo(buf, " FORCE");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
|
||||||
|
errmsg("unrecognized DROP DATABASE option \"%s\"",
|
||||||
|
option->defname)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,3 +1,14 @@
|
||||||
-- citus--12.1-1--12.2-1
|
-- citus--12.1-1--12.2-1
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_internal_database_command creates a database according to the given command.
|
||||||
|
*/
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command text)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C
|
||||||
|
STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_internal_database_command$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS
|
||||||
|
'run a database command without transaction block restrictions';
|
||||||
|
|
||||||
-- bump version to 12.2-1
|
-- bump version to 12.2-1
|
||||||
|
|
|
@ -1,2 +1,3 @@
|
||||||
-- citus--12.2-1--12.1-1
|
-- citus--12.2-1--12.1-1
|
||||||
|
DROP FUNCTION pg_catalog.citus_internal_database_command(text);
|
||||||
-- this is an empty downgrade path since citus--12.2-1--12.1-1.sql is empty for now
|
-- this is an empty downgrade path since citus--12.2-1--12.1-1.sql is empty for now
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -227,6 +227,9 @@ extern char * DeparseGrantOnDatabaseStmt(Node *node);
|
||||||
extern char * DeparseAlterDatabaseStmt(Node *node);
|
extern char * DeparseAlterDatabaseStmt(Node *node);
|
||||||
extern char * DeparseAlterDatabaseRefreshCollStmt(Node *node);
|
extern char * DeparseAlterDatabaseRefreshCollStmt(Node *node);
|
||||||
extern char * DeparseAlterDatabaseSetStmt(Node *node);
|
extern char * DeparseAlterDatabaseSetStmt(Node *node);
|
||||||
|
extern char * DeparseCreateDatabaseStmt(Node *node);
|
||||||
|
extern char * DeparseDropDatabaseStmt(Node *node);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* forward declaration for deparse_publication_stmts.c */
|
/* forward declaration for deparse_publication_stmts.c */
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
extern bool ObjectExists(const ObjectAddress *address);
|
extern bool ObjectExists(const ObjectAddress *address);
|
||||||
extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
|
extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
|
||||||
extern bool IsAnyObjectDistributed(const List *addresses);
|
extern bool IsAnyObjectDistributed(const List *addresses);
|
||||||
|
extern bool IsObjectDistributed(const ObjectAddress *address);
|
||||||
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
||||||
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
||||||
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
||||||
|
|
Loading…
Reference in New Issue