diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 7f0f03aa6..443c0f366 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -35,6 +35,15 @@ #include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/adaptive_executor.h" +#include "access/htup_details.h" +#include "catalog/pg_tablespace.h" +#include "access/heapam.h" +#include "utils/relcache.h" +#include "utils/rel.h" +#include "utils/lsyscache.h" +#include "catalog/pg_collation.h" +#include "utils/relcache.h" +#include "catalog/pg_database_d.h" static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); @@ -415,3 +424,245 @@ CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) missing_ok); return list_make1(dbAddress); } + + +static char * +GetTablespaceName(Oid tablespaceOid) +{ + HeapTuple tuple = SearchSysCache1(TABLESPACEOID, ObjectIdGetDatum(tablespaceOid)); + if (!HeapTupleIsValid(tuple)) + { + return NULL; + } + + Form_pg_tablespace tablespaceForm = (Form_pg_tablespace) GETSTRUCT(tuple); + char *tablespaceName = NameStr(tablespaceForm->spcname); + + ReleaseSysCache(tuple); + + return tablespaceName; +} + + +/* + * DatabaseCollationInfo is used to store collation related information of a database + */ +typedef struct DatabaseCollationInfo +{ + char *collation; + char *ctype; + char *icu_locale; + char *collversion; +} DatabaseCollationInfo; + +/* + * GetDatabaseCollation gets oid of a database and returns all the collation related information + * We need this method since collation related info in Form_pg_database is not accessible + */ +static DatabaseCollationInfo +GetDatabaseCollation(Oid db_oid) +{ + HeapTuple tup; + DatabaseCollationInfo info; + Datum collationDatum, ctypeDatum, icuLocaleDatum, collverDatum; + bool isNull; + Relation rel; + TupleDesc tupdesc; + Snapshot snapshot; + + snapshot = RegisterSnapshot(GetLatestSnapshot()); + rel = table_open(DatabaseRelationId, AccessShareLock); + tup = get_catalog_object_by_oid(rel, Anum_pg_database_oid, db_oid); + if (!HeapTupleIsValid(tup)) + { + elog(ERROR, "cache lookup failed for database %u", db_oid); + } + + tupdesc = RelationGetDescr(rel); + collationDatum = heap_getattr(tup, Anum_pg_database_datcollate, tupdesc, &isNull); + if (isNull) + { + info.collation = NULL; + } + else + { + info.collation = TextDatumGetCString(collationDatum); + } + + ctypeDatum = heap_getattr(tup, Anum_pg_database_datctype, tupdesc, &isNull); + if (isNull) + { + info.ctype = NULL; + } + else + { + info.ctype = TextDatumGetCString(ctypeDatum); + } + + icuLocaleDatum = heap_getattr(tup, Anum_pg_database_daticulocale, tupdesc, &isNull); + if (isNull) + { + info.icu_locale = NULL; + } + else + { + info.icu_locale = TextDatumGetCString(icuLocaleDatum); + } + + collverDatum = heap_getattr(tup, Anum_pg_database_datcollversion, tupdesc, &isNull); + if (isNull) + { + info.collversion = NULL; + } + else + { + info.collversion = TextDatumGetCString(collverDatum); + } + + table_close(rel, AccessShareLock); + UnregisterSnapshot(snapshot); + heap_freetuple(tup); + + return info; +} + + +static void +FreeDatabaseCollationInfo(DatabaseCollationInfo collInfo) +{ + if (collInfo.collation != NULL) + { + pfree(collInfo.collation); + } + if (collInfo.ctype != NULL) + { + pfree(collInfo.ctype); + } + if (collInfo.icu_locale != NULL) + { + pfree(collInfo.icu_locale); + } +} + + +/* + * GenerateCreateDatabaseStatementFromPgDatabase is gets the pg_database tuple and returns the CREATE DATABASE statement + */ +static char * +GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm) +{ + DatabaseCollationInfo collInfo = GetDatabaseCollation(databaseForm->oid); + elog(LOG, "collInfo: %s %s %s %s", collInfo.collation, collInfo.ctype, + collInfo.icu_locale, collInfo.collversion); + + StringInfoData str; + initStringInfo(&str); + + appendStringInfo(&str, "CREATE DATABASE %s", quote_identifier(NameStr( + databaseForm-> + datname))); + + if (databaseForm->datdba != InvalidOid) + { + appendStringInfo(&str, " OWNER = %s", GetUserNameFromId(databaseForm->datdba, + false)); + } + + if (databaseForm->encoding != -1) + { + appendStringInfo(&str, " ENCODING = '%s'", pg_encoding_to_char( + databaseForm->encoding)); + } + + if (collInfo.collation != NULL) + { + appendStringInfo(&str, " LC_COLLATE = '%s'", collInfo.collation); + } + if (collInfo.ctype != NULL) + { + appendStringInfo(&str, " LC_CTYPE = '%s'", collInfo.ctype); + } + + if (collInfo.icu_locale != NULL) + { + appendStringInfo(&str, " ICU_LOCALE = '%s'", collInfo.icu_locale); + } + + if (databaseForm->datlocprovider != 0) + { + appendStringInfo(&str, " LOCALE_PROVIDER = '%c'", databaseForm->datlocprovider); + } + + if (collInfo.collversion != NULL) + { + appendStringInfo(&str, " COLLATION_VERSION = '%s'", collInfo.collversion); + } + + if (databaseForm->dattablespace != InvalidOid) + { + appendStringInfo(&str, " TABLESPACE = %s", quote_identifier(GetTablespaceName( + databaseForm-> + dattablespace))); + } + + appendStringInfo(&str, " ALLOW_CONNECTIONS = '%s'", databaseForm->datallowconn ? + "true" : "false"); + + if (databaseForm->datconnlimit >= 0) + { + appendStringInfo(&str, " CONNECTION LIMIT %d", databaseForm->datconnlimit); + } + + appendStringInfo(&str, " IS_TEMPLATE = '%s'", databaseForm->datistemplate ? "true" : + "false"); + + FreeDatabaseCollationInfo(collInfo); + + + return str.data; +} + + +/* + * GenerateCreateDatabaseCommandList is gets the pg_database tuples and returns the CREATE DATABASE statement list + * for all the databases in the cluster.citus_internal_database_command UDF is used to send the CREATE DATABASE + * statement to the workers since the CREATE DATABASE statement gives error in transaction context. + */ +List * +GenerateCreateDatabaseCommandList(void) +{ + List *commands = NIL; + HeapTuple tuple; + Relation pgDatabaseRel; + TableScanDesc scan; + + pgDatabaseRel = table_open(DatabaseRelationId, AccessShareLock); + scan = table_beginscan_catalog(pgDatabaseRel, 0, NULL); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_database databaseForm = (Form_pg_database) GETSTRUCT(tuple); + + char *createStmt = GenerateCreateDatabaseStatementFromPgDatabase(databaseForm); + + + StringInfo outerDbStmt; + outerDbStmt = makeStringInfo(); + + /* Generate the CREATE DATABASE statement */ + appendStringInfo(outerDbStmt, + "select pg_catalog.citus_internal_database_command( %s)", + quote_literal_cstr( + createStmt)); + + elog(LOG, "outerDbStmt: %s", outerDbStmt->data); + + /* Add the statement to the list of commands */ + commands = lappend(commands, outerDbStmt->data); + } + + heap_endscan(scan); + table_close(pgDatabaseRel, AccessShareLock); + + return commands; +} diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 40bdae0ea..54fa801ae 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -4501,6 +4501,13 @@ PropagateNodeWideObjectsCommandList(void) /* collect all commands */ List *ddlCommands = NIL; + if (EnableCreateDatabasePropagation) + { + /* Get commands for database creation */ + List *createDatabaseCommands = GenerateCreateDatabaseCommandList(); + ddlCommands = list_concat(ddlCommands, createDatabaseCommands); + } + if (EnableAlterRoleSetPropagation) { /* diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 06dda53eb..a4f890f9e 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -244,6 +244,7 @@ extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess); extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess); +extern List * GenerateCreateDatabaseCommandList(void); /* domain.c - forward declarations */