Merge branch 'create_alter_database' of https://github.com/citusdata/citus into create_alter_database

pull/7240/head
gindibay 2023-11-13 17:17:39 +03:00
commit 05d8d4454f
1 changed files with 78 additions and 118 deletions

View File

@ -49,12 +49,16 @@
*/
typedef struct DatabaseCollationInfo
{
char *collation;
char *ctype;
char *datcollate;
char *datctype;
#if PG_VERSION_NUM >= PG_VERSION_15
char *icuLocale;
char *collversion;
char *icuRules;
char *daticulocale;
char *datcollversion;
#endif
#if PG_VERSION_NUM >= PG_VERSION_16
char *daticurules;
#endif
} DatabaseCollationInfo;
@ -486,15 +490,14 @@ GetTablespaceName(Oid tablespaceOid)
/*
* GetDatabaseCollation gets oid of a database and returns all the collation related information
* We need this method since collation related info in Form_pg_database is not accessible
* We need this method since collation related info in Form_pg_database is not accessible.
*/
static DatabaseCollationInfo
GetDatabaseCollation(Oid dbOid)
{
DatabaseCollationInfo info;
bool isNull;
memset(&info, 0, sizeof(DatabaseCollationInfo));
Snapshot snapshot = RegisterSnapshot(GetLatestSnapshot());
Relation rel = table_open(DatabaseRelationId, AccessShareLock);
HeapTuple tup = get_catalog_object_by_oid(rel, Anum_pg_database_oid, dbOid);
if (!HeapTupleIsValid(tup))
@ -502,69 +505,44 @@ GetDatabaseCollation(Oid dbOid)
elog(ERROR, "cache lookup failed for database %u", dbOid);
}
bool isNull = false;
TupleDesc tupdesc = RelationGetDescr(rel);
Datum collationDatum = heap_getattr(tup, Anum_pg_database_datcollate, tupdesc,
&isNull);
if (isNull)
{
info.collation = NULL;
}
else
{
info.collation = TextDatumGetCString(collationDatum);
}
info.datcollate = TextDatumGetCString(collationDatum);
Datum ctypeDatum = heap_getattr(tup, Anum_pg_database_datctype, tupdesc, &isNull);
if (isNull)
{
info.ctype = NULL;
}
else
{
info.ctype = TextDatumGetCString(ctypeDatum);
}
info.datctype = TextDatumGetCString(ctypeDatum);
#if PG_VERSION_NUM >= PG_VERSION_15
Datum icuLocaleDatum = heap_getattr(tup, Anum_pg_database_daticulocale, tupdesc,
&isNull);
if (isNull)
if (!isNull)
{
info.icuLocale = NULL;
}
else
{
info.icuLocale = TextDatumGetCString(icuLocaleDatum);
info.daticulocale = TextDatumGetCString(icuLocaleDatum);
}
Datum collverDatum = heap_getattr(tup, Anum_pg_database_datcollversion, tupdesc,
&isNull);
if (isNull)
if (!isNull)
{
info.collversion = NULL;
}
else
{
info.collversion = TextDatumGetCString(collverDatum);
info.datcollversion = TextDatumGetCString(collverDatum);
}
#endif
#if PG_VERSION_NUM >= PG_VERSION_16
Datum icuRulesDatum = heap_getattr(tup, Anum_pg_database_daticurules, tupdesc,
Datum icurulesDatum = heap_getattr(tup, Anum_pg_database_daticurules, tupdesc,
&isNull);
if (isNull)
if (!isNull)
{
info.icuRules = NULL;
}
else
{
info.icuRules = TextDatumGetCString(icuRulesDatum);
info.daticurules = TextDatumGetCString(icurulesDatum);
}
#endif
table_close(rel, AccessShareLock);
UnregisterSnapshot(snapshot);
heap_freetuple(tup);
return info;
@ -592,13 +570,11 @@ GetLocaleProviderString(char datlocprovider)
return "icu";
}
case 'l':
{
return "locale";
}
default:
return "";
{
ereport(ERROR, (errmsg("unexpected datlocprovider value: %c",
datlocprovider)));
}
}
}
@ -609,6 +585,10 @@ GetLocaleProviderString(char datlocprovider)
/*
* GenerateCreateDatabaseStatementFromPgDatabase gets the pg_database tuple and returns the
* CREATE DATABASE statement that can be used to create given database.
*
* Note that this doesn't deparse OID of the database and this is not a
* problem as we anyway don't allow specifying custom OIDs for databases
* when creating them.
*/
static char *
GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm)
@ -621,73 +601,53 @@ GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm)
appendStringInfo(&str, "CREATE DATABASE %s",
quote_identifier(NameStr(databaseForm->datname)));
if (databaseForm->datdba != InvalidOid)
{
appendStringInfo(&str, " OWNER = %s",
quote_literal_cstr(GetUserNameFromId(databaseForm->datdba,
false)));
}
if (databaseForm->encoding != -1)
{
appendStringInfo(&str, " ENCODING = %s",
quote_literal_cstr(pg_encoding_to_char(databaseForm->encoding)));
}
if (collInfo.collation != NULL)
{
appendStringInfo(&str, " LC_COLLATE = %s", quote_literal_cstr(
collInfo.collation));
}
if (collInfo.ctype != NULL)
{
appendStringInfo(&str, " LC_CTYPE = %s", quote_literal_cstr(collInfo.ctype));
}
#if PG_VERSION_NUM >= PG_VERSION_15
if (collInfo.icuLocale != NULL)
{
appendStringInfo(&str, " ICU_LOCALE = %s", quote_literal_cstr(
collInfo.icuLocale));
}
if (databaseForm->datlocprovider != 0)
{
appendStringInfo(&str, " LOCALE_PROVIDER = %s",
quote_literal_cstr(GetLocaleProviderString(
databaseForm->datlocprovider)));
}
if (collInfo.collversion != NULL)
{
appendStringInfo(&str, " COLLATION_VERSION = %s", quote_literal_cstr(
collInfo.collversion));
}
#endif
if (databaseForm->dattablespace != InvalidOid)
{
appendStringInfo(&str, " TABLESPACE = %s",
quote_identifier(GetTablespaceName(
databaseForm->dattablespace)));
}
appendStringInfo(&str, " CONNECTION LIMIT %d", databaseForm->datconnlimit);
appendStringInfo(&str, " ALLOW_CONNECTIONS = %s",
quote_literal_cstr(databaseForm->datallowconn ? "true" : "false"));
if (databaseForm->datconnlimit >= 0)
{
appendStringInfo(&str, " CONNECTION LIMIT %d", databaseForm->datconnlimit);
}
if(collInfo.icuRules != NULL){
appendStringInfo(&str, " ICU_RULES = %s",
quote_literal_cstr(collInfo.icuRules));
}
appendStringInfo(&str, " IS_TEMPLATE = %s",
quote_literal_cstr(databaseForm->datistemplate ? "true" : "false"));
appendStringInfo(&str, " LC_COLLATE = %s",
quote_literal_cstr(collInfo.datcollate));
appendStringInfo(&str, " LC_CTYPE = %s", quote_literal_cstr(collInfo.datctype));
appendStringInfo(&str, " OWNER = %s",
quote_literal_cstr(GetUserNameFromId(databaseForm->datdba, false)));
appendStringInfo(&str, " TABLESPACE = %s",
quote_identifier(GetTablespaceName(databaseForm->dattablespace)));
appendStringInfo(&str, " ENCODING = %s",
quote_literal_cstr(pg_encoding_to_char(databaseForm->encoding)));
#if PG_VERSION_NUM >= PG_VERSION_15
if (collInfo.datcollversion != NULL)
{
appendStringInfo(&str, " COLLATION_VERSION = %s",
quote_literal_cstr(collInfo.datcollversion));
}
if (collInfo.daticulocale != NULL)
{
appendStringInfo(&str, " ICU_LOCALE = %s", quote_literal_cstr(
collInfo.daticulocale));
}
appendStringInfo(&str, " LOCALE_PROVIDER = %s",
quote_literal_cstr(GetLocaleProviderString(
databaseForm->datlocprovider)));
#endif
#if PG_VERSION_NUM >= PG_VERSION_16
if (collInfo.daticurules != NULL)
{
appendStringInfo(&str, " ICU_RULES = %s", quote_literal_cstr(
collInfo.daticurules));
}
#endif
return str.data;
}