Adds metadatasync for create database

pull/7240/head
gindibay 2023-10-26 20:36:43 +03:00
parent c8fcf080c2
commit 641e413676
3 changed files with 259 additions and 0 deletions

View File

@ -35,6 +35,15 @@
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/adaptive_executor.h" #include "distributed/adaptive_executor.h"
#include "access/htup_details.h"
#include "catalog/pg_tablespace.h"
#include "access/heapam.h"
#include "utils/relcache.h"
#include "utils/rel.h"
#include "utils/lsyscache.h"
#include "catalog/pg_collation.h"
#include "utils/relcache.h"
#include "catalog/pg_database_d.h"
static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
@ -415,3 +424,245 @@ CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
missing_ok); missing_ok);
return list_make1(dbAddress); return list_make1(dbAddress);
} }
static char *
GetTablespaceName(Oid tablespaceOid)
{
HeapTuple tuple = SearchSysCache1(TABLESPACEOID, ObjectIdGetDatum(tablespaceOid));
if (!HeapTupleIsValid(tuple))
{
return NULL;
}
Form_pg_tablespace tablespaceForm = (Form_pg_tablespace) GETSTRUCT(tuple);
char *tablespaceName = NameStr(tablespaceForm->spcname);
ReleaseSysCache(tuple);
return tablespaceName;
}
/*
* DatabaseCollationInfo is used to store collation related information of a database
*/
typedef struct DatabaseCollationInfo
{
char *collation;
char *ctype;
char *icu_locale;
char *collversion;
} DatabaseCollationInfo;
/*
* GetDatabaseCollation gets oid of a database and returns all the collation related information
* We need this method since collation related info in Form_pg_database is not accessible
*/
static DatabaseCollationInfo
GetDatabaseCollation(Oid db_oid)
{
HeapTuple tup;
DatabaseCollationInfo info;
Datum collationDatum, ctypeDatum, icuLocaleDatum, collverDatum;
bool isNull;
Relation rel;
TupleDesc tupdesc;
Snapshot snapshot;
snapshot = RegisterSnapshot(GetLatestSnapshot());
rel = table_open(DatabaseRelationId, AccessShareLock);
tup = get_catalog_object_by_oid(rel, Anum_pg_database_oid, db_oid);
if (!HeapTupleIsValid(tup))
{
elog(ERROR, "cache lookup failed for database %u", db_oid);
}
tupdesc = RelationGetDescr(rel);
collationDatum = heap_getattr(tup, Anum_pg_database_datcollate, tupdesc, &isNull);
if (isNull)
{
info.collation = NULL;
}
else
{
info.collation = TextDatumGetCString(collationDatum);
}
ctypeDatum = heap_getattr(tup, Anum_pg_database_datctype, tupdesc, &isNull);
if (isNull)
{
info.ctype = NULL;
}
else
{
info.ctype = TextDatumGetCString(ctypeDatum);
}
icuLocaleDatum = heap_getattr(tup, Anum_pg_database_daticulocale, tupdesc, &isNull);
if (isNull)
{
info.icu_locale = NULL;
}
else
{
info.icu_locale = TextDatumGetCString(icuLocaleDatum);
}
collverDatum = heap_getattr(tup, Anum_pg_database_datcollversion, tupdesc, &isNull);
if (isNull)
{
info.collversion = NULL;
}
else
{
info.collversion = TextDatumGetCString(collverDatum);
}
table_close(rel, AccessShareLock);
UnregisterSnapshot(snapshot);
heap_freetuple(tup);
return info;
}
static void
FreeDatabaseCollationInfo(DatabaseCollationInfo collInfo)
{
if (collInfo.collation != NULL)
{
pfree(collInfo.collation);
}
if (collInfo.ctype != NULL)
{
pfree(collInfo.ctype);
}
if (collInfo.icu_locale != NULL)
{
pfree(collInfo.icu_locale);
}
}
/*
* GenerateCreateDatabaseStatementFromPgDatabase is gets the pg_database tuple and returns the CREATE DATABASE statement
*/
static char *
GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm)
{
DatabaseCollationInfo collInfo = GetDatabaseCollation(databaseForm->oid);
elog(LOG, "collInfo: %s %s %s %s", collInfo.collation, collInfo.ctype,
collInfo.icu_locale, collInfo.collversion);
StringInfoData str;
initStringInfo(&str);
appendStringInfo(&str, "CREATE DATABASE %s", quote_identifier(NameStr(
databaseForm->
datname)));
if (databaseForm->datdba != InvalidOid)
{
appendStringInfo(&str, " OWNER = %s", GetUserNameFromId(databaseForm->datdba,
false));
}
if (databaseForm->encoding != -1)
{
appendStringInfo(&str, " ENCODING = '%s'", pg_encoding_to_char(
databaseForm->encoding));
}
if (collInfo.collation != NULL)
{
appendStringInfo(&str, " LC_COLLATE = '%s'", collInfo.collation);
}
if (collInfo.ctype != NULL)
{
appendStringInfo(&str, " LC_CTYPE = '%s'", collInfo.ctype);
}
if (collInfo.icu_locale != NULL)
{
appendStringInfo(&str, " ICU_LOCALE = '%s'", collInfo.icu_locale);
}
if (databaseForm->datlocprovider != 0)
{
appendStringInfo(&str, " LOCALE_PROVIDER = '%c'", databaseForm->datlocprovider);
}
if (collInfo.collversion != NULL)
{
appendStringInfo(&str, " COLLATION_VERSION = '%s'", collInfo.collversion);
}
if (databaseForm->dattablespace != InvalidOid)
{
appendStringInfo(&str, " TABLESPACE = %s", quote_identifier(GetTablespaceName(
databaseForm->
dattablespace)));
}
appendStringInfo(&str, " ALLOW_CONNECTIONS = '%s'", databaseForm->datallowconn ?
"true" : "false");
if (databaseForm->datconnlimit >= 0)
{
appendStringInfo(&str, " CONNECTION LIMIT %d", databaseForm->datconnlimit);
}
appendStringInfo(&str, " IS_TEMPLATE = '%s'", databaseForm->datistemplate ? "true" :
"false");
FreeDatabaseCollationInfo(collInfo);
return str.data;
}
/*
* GenerateCreateDatabaseCommandList is gets the pg_database tuples and returns the CREATE DATABASE statement list
* for all the databases in the cluster.citus_internal_database_command UDF is used to send the CREATE DATABASE
* statement to the workers since the CREATE DATABASE statement gives error in transaction context.
*/
List *
GenerateCreateDatabaseCommandList(void)
{
List *commands = NIL;
HeapTuple tuple;
Relation pgDatabaseRel;
TableScanDesc scan;
pgDatabaseRel = table_open(DatabaseRelationId, AccessShareLock);
scan = table_beginscan_catalog(pgDatabaseRel, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_database databaseForm = (Form_pg_database) GETSTRUCT(tuple);
char *createStmt = GenerateCreateDatabaseStatementFromPgDatabase(databaseForm);
StringInfo outerDbStmt;
outerDbStmt = makeStringInfo();
/* Generate the CREATE DATABASE statement */
appendStringInfo(outerDbStmt,
"select pg_catalog.citus_internal_database_command( %s)",
quote_literal_cstr(
createStmt));
elog(LOG, "outerDbStmt: %s", outerDbStmt->data);
/* Add the statement to the list of commands */
commands = lappend(commands, outerDbStmt->data);
}
heap_endscan(scan);
table_close(pgDatabaseRel, AccessShareLock);
return commands;
}

View File

@ -4501,6 +4501,13 @@ PropagateNodeWideObjectsCommandList(void)
/* collect all commands */ /* collect all commands */
List *ddlCommands = NIL; List *ddlCommands = NIL;
if (EnableCreateDatabasePropagation)
{
/* Get commands for database creation */
List *createDatabaseCommands = GenerateCreateDatabaseCommandList();
ddlCommands = list_concat(ddlCommands, createDatabaseCommands);
}
if (EnableAlterRoleSetPropagation) if (EnableAlterRoleSetPropagation)
{ {
/* /*

View File

@ -244,6 +244,7 @@ extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool
isPostprocess); isPostprocess);
extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool
isPostprocess); isPostprocess);
extern List * GenerateCreateDatabaseCommandList(void);
/* domain.c - forward declarations */ /* domain.c - forward declarations */