Fixes review items

pull/7240/head
gindibay 2023-11-08 02:02:00 +03:00
parent 595d078f95
commit 65660db10d
16 changed files with 310 additions and 205 deletions

View File

@ -14,8 +14,10 @@
#include "postgres.h" #include "postgres.h"
#include "catalog/objectaddress.h" #include "catalog/objectaddress.h"
#include "catalog/pg_database.h"
#include "catalog/pg_ts_config.h" #include "catalog/pg_ts_config.h"
#include "catalog/pg_ts_dict.h" #include "catalog/pg_ts_dict.h"
#include "commands/dbcommands.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "tcop/utility.h" #include "tcop/utility.h"
@ -28,8 +30,6 @@
#include "distributed/metadata/distobject.h" #include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
/* /*

View File

@ -10,46 +10,53 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h"
#include "access/heapam.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/objectaddress.h" #include "catalog/objectaddress.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_database.h" #include "catalog/pg_database.h"
#include "catalog/pg_database_d.h"
#include "catalog/pg_tablespace.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "miscadmin.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "utils/syscache.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
#include "distributed/adaptive_executor.h"
#include "distributed/commands.h" #include "distributed/commands.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/deparser.h" #include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "distributed/worker_transaction.h"
#include "distributed/deparser.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/metadata/distobject.h" #include "distributed/worker_transaction.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h"
#include "distributed/adaptive_executor.h"
#include "access/htup_details.h"
#include "catalog/pg_tablespace.h"
#include "access/heapam.h"
#include "utils/relcache.h"
#include "utils/rel.h"
#include "utils/lsyscache.h"
#include "catalog/pg_collation.h"
#include "utils/relcache.h"
#include "catalog/pg_database_d.h"
/*
* DatabaseCollationInfo is used to store collation related information of a database
*/
typedef struct DatabaseCollationInfo
{
char *collation;
char *ctype;
#if PG_VERSION_NUM >= PG_VERSION_15
char *icu_locale;
char *collversion;
#endif
} DatabaseCollationInfo;
static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
PG_FUNCTION_INFO_V1(citus_internal_database_command);
static Oid get_database_owner(Oid db_oid); static Oid get_database_owner(Oid db_oid);
List * PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString, List * PreprocessGrantOnDatabaseStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
@ -264,6 +271,13 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
} }
/*
* PostprocessAlterDatabaseStmt is executed before the statement is applied to the local
* postgres instance.
*
* In this stage, we can perform validations and prepare the commands that need to
* be run on all workers to grant.
*/
List * List *
PreprocessCreateDatabaseStmt(Node *node, const char *queryString, PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext) ProcessUtilityContext processUtilityContext)
@ -304,82 +318,21 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
(void *) createDatabaseCommand, (void *) createDatabaseCommand,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands); return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
} }
/* /*
* citus_internal_database_command is an internal UDF to * PostprocessAlterDatabaseStmt is executed after the statement is applied to the local
* create/drop a database in an idempotent maner without * postgres instance. In this stage we can prepare the commands that need to be run on
* transaction block restrictions. * all workers to drop the database. Since the DROP DATABASE statement gives error in
* transaction context, we need to use NontransactionalNodeDDLTaskList to send the
* DROP DATABASE statement to the workers.
*/ */
Datum
citus_internal_database_command(PG_FUNCTION_ARGS)
{
int saveNestLevel = NewGUCNestLevel();
text *commandText = PG_GETARG_TEXT_P(0);
char *command = text_to_cstring(commandText);
Node *parseTree = ParseTreeNode(command);
set_config_option("citus.enable_ddl_propagation", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
set_config_option("citus.enable_create_database_propagation", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
/*
* createdb() / DropDatabase() uses ParseState to report the error position for the
* input command and the position is reported to be 0 when it's provided as NULL.
* We're okay with that because we don't expect this UDF to be called with an incorrect
* DDL command.
*
*/
ParseState *pstate = NULL;
if (IsA(parseTree, CreatedbStmt))
{
CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree);
bool missingOk = true;
Oid databaseOid = get_database_oid(stmt->dbname, missingOk);
if (!OidIsValid(databaseOid))
{
createdb(pstate, (CreatedbStmt *) parseTree);
}
}
else if (IsA(parseTree, DropdbStmt))
{
DropdbStmt *stmt = castNode(DropdbStmt, parseTree);
bool missingOk = false;
Oid databaseOid = get_database_oid(stmt->dbname, missingOk);
if (OidIsValid(databaseOid))
{
DropDatabase(pstate, (DropdbStmt *) parseTree);
}
}
else
{
ereport(ERROR, (errmsg("unsupported command type %d", nodeTag(parseTree))));
}
/* Below command rollbacks flags to the state before this session*/
AtEOXact_GUC(true, saveNestLevel);
PG_RETURN_VOID();
}
List * List *
PreprocessDropDatabaseStmt(Node *node, const char *queryString, PreprocessDropDatabaseStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext) ProcessUtilityContext processUtilityContext)
{ {
bool isPostProcess = false;
if (!EnableCreateDatabasePropagation || !ShouldPropagate()) if (!EnableCreateDatabasePropagation || !ShouldPropagate())
{ {
return NIL; return NIL;
@ -389,41 +342,50 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
DropdbStmt *stmt = (DropdbStmt *) node; DropdbStmt *stmt = (DropdbStmt *) node;
bool isPostProcess = false;
List *addresses = GetObjectAddressListFromParseTree(node, stmt->missing_ok, List *addresses = GetObjectAddressListFromParseTree(node, stmt->missing_ok,
isPostProcess); isPostProcess);
if (list_length(addresses) == 0) if (list_length(addresses) != 1)
{ {
return NIL; ereport(ERROR, (errmsg("unexpected number of objects found when "
"executing DROP DATABASE command")));
} }
ObjectAddress *address = (ObjectAddress *) linitial(addresses); ObjectAddress *address = (ObjectAddress *) linitial(addresses);
if (address->objectId == InvalidOid || !IsObjectDistributed(address)) if (address->objectId == InvalidOid || !IsAnyObjectDistributed(list_make1(address)))
{ {
return NIL; return NIL;
} }
char *dropDatabaseCommand = DeparseTreeNode(node); char *dropDatabaseCommand = DeparseTreeNode(node);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropDatabaseCommand, (void *) dropDatabaseCommand,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands); return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
} }
/*
* GetDatabaseAddressFromDatabaseName gets the database name and returns the ObjectAddress
* of the database.
*/
static ObjectAddress * static ObjectAddress *
GetDatabaseAddressFromDatabaseName(char *databaseName, bool missingOk) GetDatabaseAddressFromDatabaseName(char *databaseName, bool missingOk)
{ {
Oid databaseOid = get_database_oid(databaseName, missingOk); Oid databaseOid = get_database_oid(databaseName, missingOk);
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); ObjectAddress *dbObjectAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid); ObjectAddressSet(*dbObjectAddress, DatabaseRelationId, databaseOid);
return dbAddress; return dbObjectAddress;
} }
/*
* DropDatabaseStmtObjectAddress gets the ObjectAddress of the database that is the
* object of the DropdbStmt.
*/
List * List *
DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
{ {
@ -434,6 +396,10 @@ DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
} }
/*
* CreateDatabaseStmtObjectAddress gets the ObjectAddress of the database that is the
* object of the CreatedbStmt.
*/
List * List *
CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
{ {
@ -444,6 +410,9 @@ CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
} }
/*
* GetTablespaceName gets the tablespace oid and returns the tablespace name.
*/
static char * static char *
GetTablespaceName(Oid tablespaceOid) GetTablespaceName(Oid tablespaceOid)
{ {
@ -462,19 +431,6 @@ GetTablespaceName(Oid tablespaceOid)
} }
/*
* DatabaseCollationInfo is used to store collation related information of a database
*/
typedef struct DatabaseCollationInfo
{
char *collation;
char *ctype;
#if PG_VERSION_NUM >= PG_VERSION_15
char *icu_locale;
char *collversion;
#endif
} DatabaseCollationInfo;
/* /*
* GetDatabaseCollation gets oid of a database and returns all the collation related information * GetDatabaseCollation gets oid of a database and returns all the collation related information
* We need this method since collation related info in Form_pg_database is not accessible * We need this method since collation related info in Form_pg_database is not accessible
@ -548,6 +504,9 @@ GetDatabaseCollation(Oid db_oid)
} }
/*
* FreeDatabaseCollationInfo frees the memory allocated for DatabaseCollationInfo
*/
static void static void
FreeDatabaseCollationInfo(DatabaseCollationInfo collInfo) FreeDatabaseCollationInfo(DatabaseCollationInfo collInfo)
{ {
@ -569,8 +528,13 @@ FreeDatabaseCollationInfo(DatabaseCollationInfo collInfo)
#if PG_VERSION_NUM >= PG_VERSION_15 #if PG_VERSION_NUM >= PG_VERSION_15
/*
* GetLocaleProviderString gets the datlocprovider stored in pg_database
* and returns the string representation of the datlocprovider
*/
static char * static char *
get_locale_provider_string(char datlocprovider) GetLocaleProviderString(char datlocprovider)
{ {
switch (datlocprovider) switch (datlocprovider)
{ {
@ -599,7 +563,8 @@ get_locale_provider_string(char datlocprovider)
/* /*
* GenerateCreateDatabaseStatementFromPgDatabase is gets the pg_database tuple and returns the CREATE DATABASE statement * GenerateCreateDatabaseStatementFromPgDatabase gets the pg_database tuple and returns the
* CREATE DATABASE statement that can be used to create given database.
*/ */
static char * static char *
GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm) GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm)
@ -642,7 +607,7 @@ GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm)
if (databaseForm->datlocprovider != 0) if (databaseForm->datlocprovider != 0)
{ {
appendStringInfo(&str, " LOCALE_PROVIDER = '%s'", get_locale_provider_string( appendStringInfo(&str, " LOCALE_PROVIDER = '%s'", GetLocaleProviderString(
databaseForm->datlocprovider)); databaseForm->datlocprovider));
} }
@ -678,19 +643,21 @@ GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm)
/* /*
* GenerateCreateDatabaseCommandList is gets the pg_database tuples and returns the CREATE DATABASE statement list * GenerateCreateDatabaseCommandList gets a list of pg_database tuples and returns
* for all the databases in the cluster.citus_internal_database_command UDF is used to send the CREATE DATABASE * a list of CREATE DATABASE statements for all the databases.
* statement to the workers since the CREATE DATABASE statement gives error in transaction context. *
* Commands in the list are wrapped by citus_internal_database_command() UDF
* to avoid from transaction block restrictions that apply to database commands
*/ */
List * List *
GenerateCreateDatabaseCommandList(void) GenerateCreateDatabaseCommandList(void)
{ {
List *commands = NIL; List *commands = NIL;
HeapTuple tuple;
Relation pgDatabaseRel = table_open(DatabaseRelationId, AccessShareLock); Relation pgDatabaseRel = table_open(DatabaseRelationId, AccessShareLock);
TableScanDesc scan = table_beginscan_catalog(pgDatabaseRel, 0, NULL); TableScanDesc scan = table_beginscan_catalog(pgDatabaseRel, 0, NULL);
HeapTuple tuple = NULL;
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{ {
Form_pg_database databaseForm = (Form_pg_database) GETSTRUCT(tuple); Form_pg_database databaseForm = (Form_pg_database) GETSTRUCT(tuple);
@ -702,7 +669,7 @@ GenerateCreateDatabaseCommandList(void)
/* Generate the CREATE DATABASE statement */ /* Generate the CREATE DATABASE statement */
appendStringInfo(outerDbStmt, appendStringInfo(outerDbStmt,
"select pg_catalog.citus_internal_database_command( %s)", "SELECT pg_catalog.citus_internal_database_command( %s)",
quote_literal_cstr( quote_literal_cstr(
createStmt)); createStmt));

View File

@ -65,6 +65,7 @@ static DefElem * makeDefElemBool(char *name, bool value);
static List * GenerateRoleOptionsList(HeapTuple tuple); static List * GenerateRoleOptionsList(HeapTuple tuple);
static List * GenerateGrantRoleStmtsFromOptions(RoleSpec *roleSpec, List *options); static List * GenerateGrantRoleStmtsFromOptions(RoleSpec *roleSpec, List *options);
static List * GenerateGrantRoleStmtsOfRole(Oid roleid); static List * GenerateGrantRoleStmtsOfRole(Oid roleid);
static void EnsureSequentialModeForRoleDDL(void);
static char * GetRoleNameFromDbRoleSetting(HeapTuple tuple, static char * GetRoleNameFromDbRoleSetting(HeapTuple tuple,
TupleDesc DbRoleSettingDescription); TupleDesc DbRoleSettingDescription);

View File

@ -25,7 +25,6 @@
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "distributed/pg_version_constants.h"
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h" #include "miscadmin.h"
@ -35,6 +34,7 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "catalog/pg_database.h"
#include "citus_version.h" #include "citus_version.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "commands/defrem.h" #include "commands/defrem.h"
@ -62,6 +62,7 @@
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_explain.h" #include "distributed/multi_explain.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/pg_version_constants.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/string_utils.h" #include "distributed/string_utils.h"
@ -80,7 +81,6 @@
#include "utils/inval.h" #include "utils/inval.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include "catalog/pg_database.h"
bool EnableDDLPropagation = true; /* ddl propagation is enabled */ bool EnableDDLPropagation = true; /* ddl propagation is enabled */
@ -579,7 +579,6 @@ citus_ProcessUtilityInternal(PlannedStmt *pstmt,
PreprocessLockStatement((LockStmt *) parsetree, context); PreprocessLockStatement((LockStmt *) parsetree, context);
} }
/* /*
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below * We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
* and distribute the partition if necessary. * and distribute the partition if necessary.
@ -726,12 +725,13 @@ citus_ProcessUtilityInternal(PlannedStmt *pstmt,
} }
/* /*
* Make sure that dropping the role and database deletes the pg_dist_object entries. There is a * Make sure that dropping node-wide objects deletes the pg_dist_object
* separate logic for roles and database, since roles and database are not included as dropped objects in the * entries. There is a separate logic for node-wide objects (such as role
* drop event trigger. To handle it both on worker and coordinator nodes, it is not * and databases), since they are not included as dropped objects in the
* implemented as a part of process functions but here. * drop event trigger. To handle it both on worker and coordinator nodes,
* it is not implemented as a part of process functions but here.
*/ */
UnmarkRolesAndDatabaseDistributed(parsetree); UnmarkNodeWideObjectsDistributed(parsetree);
pstmt->utilityStmt = parsetree; pstmt->utilityStmt = parsetree;
@ -1265,10 +1265,12 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
{ {
ereport(WARNING, ereport(WARNING,
(errmsg( (errmsg(
"Commands that are not transaction-safe may result in partial failure" "Commands that are not transaction-safe may result in "
", potentially leading to an inconsistent state.\nIf the problematic command" "partial failure, potentially leading to an inconsistent "
" is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the " "state.\nIf the problematic command is a CREATE operation, "
"object,\nif applicable, and then reattempt the original command."))); "consider using the 'IF EXISTS' syntax to drop the object,"
"\nif applicable, and then re-attempt the original command.")));
PG_RE_THROW(); PG_RE_THROW();
} }
} }
@ -1483,12 +1485,12 @@ DDLTaskList(Oid relationId, const char *commandString)
/* /*
* NontransactionalNodeDDLTask builds a list of tasks to execute a DDL command on a * NontransactionalNodeDDLTaskList builds a list of tasks to execute a DDL command on a
* given target set of nodes with cannotBeExecutedInTransaction is set to make sure * given target set of nodes with cannotBeExecutedInTransaction is set to make sure
* that list is being executed without a transaction. * that task list is executed outside a transaction block.
*/ */
List * List *
NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands) NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands)
{ {
List *ddlJobs = NodeDDLTaskList(targets, commands); List *ddlJobs = NodeDDLTaskList(targets, commands);
DDLJob *ddlJob = NULL; DDLJob *ddlJob = NULL;

View File

@ -1,56 +1,66 @@
/*
* citus_deparseutils.c
* ---------------------
*
* This file contains common functions used for deparsing PostgreSQL statements
* to their equivalent SQL representation.
*
*/
#include "postgres.h" #include "postgres.h"
#include "utils/builtins.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "distributed/deparser.h"
#include "distributed/pg_version_constants.h"
#include "utils/builtins.h"
#include "utils/elog.h" #include "utils/elog.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/relcache.h" #include "utils/relcache.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/typcache.h" #include "utils/typcache.h"
#include "distributed/deparser.h"
#include "distributed/pg_version_constants.h"
/** /**
* Convert a DefElem option to a SQL statement and append it to the given StringInfo buffer. * DefElemOptionToStatement converts a DefElem option to a SQL statement and
* appends it to the given StringInfo buffer.
* *
* @param buf The StringInfo buffer to append the SQL statement to. * @param buf The StringInfo buffer to append the SQL statement to.
* @param option The DefElem option to convert to a SQL statement. * @param option The DefElem option to convert to a SQL statement.
* @param opt_formats The option format specification to use for the conversion. * @param optionFormats The option format specification to use for the conversion.
* @param num_opt_formats The number of option formats in the opt_formats array. * @param optionFormatsLen The number of option formats in the opt_formats array.
*/ */
void void
optionToStatement(StringInfo buf, DefElem *option, const struct DefElemOptionToStatement(StringInfo buf, DefElem *option, const
option_format *opt_formats, int DefElemOptionFormat *optionFormats, int
opt_formats_len) optionFormatsLen)
{ {
const char *name = option->defname; const char *name = option->defname;
int i; int i;
for (i = 0; i < opt_formats_len; i++) for (i = 0; i < optionFormatsLen; i++)
{ {
if (strcmp(name, opt_formats[i].name) == 0) if (strcmp(name, optionFormats[i].name) == 0)
{ {
switch (opt_formats[i].type) switch (optionFormats[i].type)
{ {
case OPTION_FORMAT_STRING: case OPTION_FORMAT_STRING:
{ {
char *value = defGetString(option); char *value = defGetString(option);
appendStringInfo(buf, opt_formats[i].format, quote_identifier(value)); appendStringInfo(buf, optionFormats[i].format, quote_identifier(
value));
break; break;
} }
case OPTION_FORMAT_INTEGER: case OPTION_FORMAT_INTEGER:
{ {
int32 value = defGetInt32(option); int32 value = defGetInt32(option);
appendStringInfo(buf, opt_formats[i].format, value); appendStringInfo(buf, optionFormats[i].format, value);
break; break;
} }
case OPTION_FORMAT_BOOLEAN: case OPTION_FORMAT_BOOLEAN:
{ {
bool value = defGetBoolean(option); bool value = defGetBoolean(option);
appendStringInfo(buf, opt_formats[i].format, value ? "true" : appendStringInfo(buf, optionFormats[i].format, value ? "true" :
"false"); "false");
break; break;
} }
@ -59,7 +69,7 @@ optionToStatement(StringInfo buf, DefElem *option, const struct
case OPTION_FORMAT_OBJECT_ID: case OPTION_FORMAT_OBJECT_ID:
{ {
Oid value = defGetObjectId(option); Oid value = defGetObjectId(option);
appendStringInfo(buf, opt_formats[i].format, value); appendStringInfo(buf, optionFormats[i].format, value);
break; break;
} }
@ -67,14 +77,14 @@ optionToStatement(StringInfo buf, DefElem *option, const struct
case OPTION_FORMAT_LITERAL_CSTR: case OPTION_FORMAT_LITERAL_CSTR:
{ {
char *value = defGetString(option); char *value = defGetString(option);
appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr( appendStringInfo(buf, optionFormats[i].format, quote_literal_cstr(
value)); value));
break; break;
} }
default: default:
{ {
elog(ERROR, "unrecognized option type: %d", opt_formats[i].type); elog(ERROR, "unrecognized option type: %d", optionFormats[i].type);
break; break;
} }
} }

View File

@ -12,25 +12,25 @@
#include "postgres.h" #include "postgres.h"
#include "pg_version_compat.h" #include "pg_version_compat.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "distributed/deparser.h"
#include "distributed/citus_ruleutils.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "distributed/deparser.h" #include "distributed/deparser.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/log_utils.h" #include "distributed/log_utils.h"
#include "parser/parse_type.h" #include "parser/parse_type.h"
#include "distributed/listutils.h"
static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt); static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt); static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt);
static void AppendDefElemConnLimit(StringInfo buf, DefElem *def); static void AppendDefElemConnLimit(StringInfo buf, DefElem *def);
const struct option_format create_database_option_formats[] = { const DefElemOptionFormat create_database_option_formats[] = {
{ "owner", " OWNER %s", OPTION_FORMAT_STRING }, { "owner", " OWNER %s", OPTION_FORMAT_STRING },
{ "template", " TEMPLATE %s", OPTION_FORMAT_STRING }, { "template", " TEMPLATE %s", OPTION_FORMAT_STRING },
{ "encoding", " ENCODING %s", OPTION_FORMAT_LITERAL_CSTR }, { "encoding", " ENCODING %s", OPTION_FORMAT_LITERAL_CSTR },
@ -49,6 +49,22 @@ const struct option_format create_database_option_formats[] = {
{ "oid", " OID %d", OPTION_FORMAT_OBJECT_ID } { "oid", " OID %d", OPTION_FORMAT_OBJECT_ID }
}; };
/*
* DeparseAlterDatabaseOwnerStmt
* Deparse an AlterDatabaseOwnerStmt node
*
* This function is responsible for producing a string representation of an
* AlterDatabaseOwnerStmt node, which represents an ALTER DATABASE statement
* that changes the owner of a database. The output string includes the ALTER
* DATABASE keyword, the name of the database being altered, and the new owner
* of the database.
*
* Parameters:
* - node: a pointer to the AlterDatabaseOwnerStmt node to be deparsed
*
* Returns:
* - a string representation of the ALTER DATABASE statement
*/
char * char *
DeparseAlterDatabaseOwnerStmt(Node *node) DeparseAlterDatabaseOwnerStmt(Node *node)
{ {
@ -64,6 +80,15 @@ DeparseAlterDatabaseOwnerStmt(Node *node)
} }
/*
*
* AppendAlterDatabaseOwnerStmt
* Append an ALTER DATABASE statement for changing the owner of a database to the given StringInfo buffer.
*
* Parameters:
* - buf: The StringInfo buffer to append the statement to.
* - stmt: The AlterOwnerStmt representing the ALTER DATABASE statement to append.
*/
static void static void
AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt) AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
{ {
@ -226,19 +251,14 @@ DeparseAlterDatabaseSetStmt(Node *node)
} }
/*
* Validates for if option is template, lc_type, locale or lc_collate, propagation will
* not be supported since template and strategy options are not stored in the catalog
* and lc_type, locale and lc_collate options depends on template parameter.
*/
static void static void
AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt) ValidateCreateDatabaseOptions(DefElem *option)
{ {
appendStringInfo(buf,
"CREATE DATABASE %s",
quote_identifier(stmt->dbname));
DefElem *option = NULL;
foreach_ptr(option, stmt->options)
{
/*If option is template, lc_type, locale or lc_collate, propagation will not be supportted */
/* since template database is not stored in the catalog */
if (strcmp(option->defname, "template") == 0 || if (strcmp(option->defname, "template") == 0 ||
strcmp(option->defname, "strategy") == 0 || strcmp(option->defname, "strategy") == 0 ||
strcmp(option->defname, "lc_ctype") == 0 || strcmp(option->defname, "lc_ctype") == 0 ||
@ -251,13 +271,35 @@ AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt)
errmsg("CREATE DATABASE option \"%s\" is not supported", errmsg("CREATE DATABASE option \"%s\" is not supported",
option->defname)); option->defname));
} }
}
optionToStatement(buf, option, create_database_option_formats, lengthof(
create_database_option_formats)); /*
* Prepares a CREATE DATABASE statement with given empty StringInfo buffer and CreatedbStmt node.
*/
static void
AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt)
{
appendStringInfo(buf,
"CREATE DATABASE %s",
quote_identifier(stmt->dbname));
DefElem *option = NULL;
foreach_ptr(option, stmt->options)
{
ValidateCreateDatabaseOptions(option);
DefElemOptionToStatement(buf, option, create_database_option_formats,
lengthof(create_database_option_formats));
} }
} }
/*
* Converts a CreatedbStmt structure into a SQL command string.
* Used in the deparsing of Create database statement.
*/
char * char *
DeparseCreateDatabaseStmt(Node *node) DeparseCreateDatabaseStmt(Node *node)
{ {
@ -271,13 +313,16 @@ DeparseCreateDatabaseStmt(Node *node)
} }
/*
* Prepares a DROP DATABASE statement with given empty StringInfo buffer and DropdbStmt node.
*/
static void static void
AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt) AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt)
{ {
char *if_exists_statement = stmt->missing_ok ? "IF EXISTS" : ""; char *ifExistsStatement = stmt->missing_ok ? "IF EXISTS" : "";
appendStringInfo(buf, appendStringInfo(buf,
"DROP DATABASE %s %s", "DROP DATABASE %s %s",
if_exists_statement, ifExistsStatement,
quote_identifier(stmt->dbname)); quote_identifier(stmt->dbname));
DefElem *option = NULL; DefElem *option = NULL;
@ -298,6 +343,10 @@ AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt)
} }
/*
* Converts a DropdbStmt structure into a SQL command string.
* Used in the deparsing of drop database statement.
*/
char * char *
DeparseDropDatabaseStmt(Node *node) DeparseDropDatabaseStmt(Node *node)
{ {

View File

@ -55,6 +55,7 @@
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
Datum *paramValues); Datum *paramValues);
static bool IsObjectDistributed(const ObjectAddress *address);
PG_FUNCTION_INFO_V1(citus_unmark_object_distributed); PG_FUNCTION_INFO_V1(citus_unmark_object_distributed);
PG_FUNCTION_INFO_V1(master_unmark_object_distributed); PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
@ -358,8 +359,12 @@ ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
} }
/*
* Deletes all pg_dist_object records for distributed roles in `DROP ROLE` statement a
* and for all databases in `DROP DATABASE` statement
*/
void void
UnmarkRolesAndDatabaseDistributed(Node *node) UnmarkNodeWideObjectsDistributed(Node *node)
{ {
if (IsA(node, DropRoleStmt)) if (IsA(node, DropRoleStmt))
{ {
@ -378,9 +383,9 @@ UnmarkRolesAndDatabaseDistributed(Node *node)
char *dbName = stmt->dbname; char *dbName = stmt->dbname;
Oid dbOid = get_database_oid(dbName, stmt->missing_ok); Oid dbOid = get_database_oid(dbName, stmt->missing_ok);
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); ObjectAddress *dbObjectAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*dbAddress, DatabaseRelationId, dbOid); ObjectAddressSet(*dbObjectAddress, DatabaseRelationId, dbOid);
UnmarkObjectDistributed(dbAddress); UnmarkObjectDistributed(dbObjectAddress);
} }
} }
@ -420,7 +425,7 @@ UnmarkObjectDistributed(const ObjectAddress *address)
* IsObjectDistributed returns if the object addressed is already distributed in the * IsObjectDistributed returns if the object addressed is already distributed in the
* cluster. This performs a local indexed lookup in pg_dist_object. * cluster. This performs a local indexed lookup in pg_dist_object.
*/ */
bool static bool
IsObjectDistributed(const ObjectAddress *address) IsObjectDistributed(const ObjectAddress *address)
{ {
ScanKeyData key[3]; ScanKeyData key[3];

View File

@ -30,12 +30,15 @@
#include "catalog/pg_attrdef.h" #include "catalog/pg_attrdef.h"
#include "catalog/pg_collation.h" #include "catalog/pg_collation.h"
#include "catalog/pg_constraint.h" #include "catalog/pg_constraint.h"
#include "catalog/pg_database.h"
#include "catalog/pg_database_d.h"
#include "catalog/pg_depend.h" #include "catalog/pg_depend.h"
#include "catalog/pg_foreign_server.h" #include "catalog/pg_foreign_server.h"
#include "catalog/pg_namespace.h" #include "catalog/pg_namespace.h"
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/async.h" #include "commands/async.h"
#include "commands/dbcommands.h"
#include "distributed/argutils.h" #include "distributed/argutils.h"
#include "distributed/backend_data.h" #include "distributed/backend_data.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
@ -179,6 +182,7 @@ PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);
PG_FUNCTION_INFO_V1(citus_internal_database_command);
static bool got_SIGTERM = false; static bool got_SIGTERM = false;
@ -3895,6 +3899,80 @@ citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS)
} }
/*
* citus_internal_database_command is an internal UDF to
* create/drop a database in an idempotent maner without
* transaction block restrictions.
*/
Datum
citus_internal_database_command(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (!ShouldSkipMetadataChecks())
{
EnsureCoordinatorInitiatedOperation();
}
PG_ENSURE_ARGNOTNULL(0, "database command");
text *commandText = PG_GETARG_TEXT_P(0);
char *command = text_to_cstring(commandText);
Node *parseTree = ParseTreeNode(command);
int saveNestLevel = NewGUCNestLevel();
set_config_option("citus.enable_ddl_propagation", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
set_config_option("citus.enable_create_database_propagation", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
/*
* createdb() / DropDatabase() uses ParseState to report the error position for the
* input command and the position is reported to be 0 when it's provided as NULL.
* We're okay with that because we don't expect this UDF to be called with an incorrect
* DDL command.
*/
ParseState *pstate = NULL;
if (IsA(parseTree, CreatedbStmt))
{
CreatedbStmt *stmt = castNode(CreatedbStmt, parseTree);
bool missingOk = true;
Oid databaseOid = get_database_oid(stmt->dbname, missingOk);
if (!OidIsValid(databaseOid))
{
createdb(pstate, (CreatedbStmt *) parseTree);
}
}
else if (IsA(parseTree, DropdbStmt))
{
DropdbStmt *stmt = castNode(DropdbStmt, parseTree);
bool missingOk = false;
Oid databaseOid = get_database_oid(stmt->dbname, missingOk);
if (OidIsValid(databaseOid))
{
DropDatabase(pstate, (DropdbStmt *) parseTree);
}
}
else
{
ereport(ERROR, (errmsg("unsupported command type %d", nodeTag(parseTree))));
}
/* Rollbacks GUCs to the state before this session */
AtEOXact_GUC(true, saveNestLevel);
PG_RETURN_VOID();
}
/* /*
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker. * SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
*/ */
@ -4503,7 +4581,7 @@ PropagateNodeWideObjectsCommandList(void)
if (EnableCreateDatabasePropagation) if (EnableCreateDatabasePropagation)
{ {
/* Get commands for database creation */ /* get commands for database creation */
List *createDatabaseCommands = GenerateCreateDatabaseCommandList(); List *createDatabaseCommands = GenerateCreateDatabaseCommandList();
ddlCommands = list_concat(ddlCommands, createDatabaseCommands); ddlCommands = list_concat(ddlCommands, createDatabaseCommands);
} }

View File

@ -1266,7 +1266,7 @@ RegisterCitusConfigVariables(void)
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.enable_create_database_propagation", "citus.enable_create_database_propagation",
gettext_noop("Enables propagating CREATE DATABASE " gettext_noop("Enables propagating CREATE DATABASE "
"and DROP DATABASE statements to workers"), "and DROP DATABASE statements to workers."),
NULL, NULL,
&EnableCreateDatabasePropagation, &EnableCreateDatabasePropagation,
false, false,

View File

@ -4,7 +4,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command text) CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command text)
RETURNS void RETURNS void
LANGUAGE C LANGUAGE C
STRICT VOLATILE
AS 'MODULE_PATHNAME', $$citus_internal_database_command$$; AS 'MODULE_PATHNAME', $$citus_internal_database_command$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS
'run a database command without transaction block restrictions'; 'run a database command without transaction block restrictions';

View File

@ -4,7 +4,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command text) CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command text)
RETURNS void RETURNS void
LANGUAGE C LANGUAGE C
STRICT VOLATILE
AS 'MODULE_PATHNAME', $$citus_internal_database_command$$; AS 'MODULE_PATHNAME', $$citus_internal_database_command$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS
'run a database command without transaction block restrictions'; 'run a database command without transaction block restrictions';

View File

@ -230,9 +230,6 @@ extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *que
ProcessUtilityContext ProcessUtilityContext
processUtilityContext); processUtilityContext);
extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool
isPostprocess);
extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
@ -242,10 +239,10 @@ extern List * PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
extern List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString); extern List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString);
extern List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, extern List * PreprocessDropDatabaseStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok,
isPostprocess); bool isPostprocess);
extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok,
isPostprocess); bool isPostprocess);
extern List * GenerateCreateDatabaseCommandList(void); extern List * GenerateCreateDatabaseCommandList(void);
@ -518,7 +515,6 @@ extern List * RenameRoleStmtObjectAddress(Node *stmt, bool missing_ok, bool
extern void UnmarkRolesDistributed(List *roles); extern void UnmarkRolesDistributed(List *roles);
extern List * FilterDistributedRoles(List *roles); extern List * FilterDistributedRoles(List *roles);
extern void EnsureSequentialModeForRoleDDL(void);
/* schema.c - forward declarations */ /* schema.c - forward declarations */
extern List * PostprocessCreateSchemaStmt(Node *node, const char *queryString); extern List * PostprocessCreateSchemaStmt(Node *node, const char *queryString);

View File

@ -94,7 +94,7 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString,
extern void MarkInvalidateForeignKeyGraph(void); extern void MarkInvalidateForeignKeyGraph(void);
extern void InvalidateForeignKeyGraphForDDL(void); extern void InvalidateForeignKeyGraphForDDL(void);
extern List * DDLTaskList(Oid relationId, const char *commandString); extern List * DDLTaskList(Oid relationId, const char *commandString);
extern List * NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands); extern List * NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands);
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands); extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
extern bool AlterTableInProgress(void); extern bool AlterTableInProgress(void);
extern bool DropSchemaOrDBInProgress(void); extern bool DropSchemaOrDBInProgress(void);

View File

@ -123,12 +123,12 @@ extern void AppendGrantSharedSuffix(StringInfo buf, GrantStmt *stmt);
/* Common deparser utils */ /* Common deparser utils */
struct option_format typedef struct DefElemOptionFormat
{ {
const char *name; char *name;
const char *format; char *format;
const int type; int type;
}; } DefElemOptionFormat;
typedef enum OptionFormatType typedef enum OptionFormatType
{ {
@ -140,8 +140,8 @@ typedef enum OptionFormatType
} OptionFormatType; } OptionFormatType;
extern void optionToStatement(StringInfo buf, DefElem *option, const struct extern void DefElemOptionToStatement(StringInfo buf, DefElem *option, const
option_format *opt_formats, int DefElemOptionFormat *opt_formats, int
opt_formats_len); opt_formats_len);

View File

@ -21,13 +21,12 @@
extern bool ObjectExists(const ObjectAddress *address); extern bool ObjectExists(const ObjectAddress *address);
extern bool CitusExtensionObject(const ObjectAddress *objectAddress); extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
extern bool IsAnyObjectDistributed(const List *addresses); extern bool IsAnyObjectDistributed(const List *addresses);
extern bool IsObjectDistributed(const ObjectAddress *address);
extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address); extern void UnmarkObjectDistributed(const ObjectAddress *address);
extern void UnmarkRolesAndDatabaseDistributed(Node *node); extern void UnmarkNodeWideObjectsDistributed(Node *node);
extern bool IsTableOwnedByExtension(Oid relationId); extern bool IsTableOwnedByExtension(Oid relationId);
extern bool ObjectAddressDependsOnExtension(const ObjectAddress *target); extern bool ObjectAddressDependsOnExtension(const ObjectAddress *target);
extern bool IsAnyObjectAddressOwnedByExtension(const List *targets, extern bool IsAnyObjectAddressOwnedByExtension(const List *targets,

View File

@ -151,8 +151,6 @@ DEPS = {
], ],
worker_count=6, worker_count=6,
), ),
"create_drop_database_propagation": TestDeps("minimal_schedule"),
"create_drop_database_propagation_pg15": TestDeps("minimal_schedule"),
"function_propagation": TestDeps("minimal_schedule"), "function_propagation": TestDeps("minimal_schedule"),
"citus_shards": TestDeps("minimal_schedule"), "citus_shards": TestDeps("minimal_schedule"),
"grant_on_foreign_server_propagation": TestDeps("minimal_schedule"), "grant_on_foreign_server_propagation": TestDeps("minimal_schedule"),