Adds database comment metadata sync

pull/7388/head
gurkanindibay 2023-12-26 07:19:52 +03:00
parent 856a53ab21
commit db5f04c4e3
3 changed files with 77 additions and 9 deletions

View File

@ -13,6 +13,7 @@
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/xact.h"
@ -45,6 +46,12 @@
#include "distributed/worker_transaction.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "catalog/pg_shdescription.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
/*
* DatabaseCollationInfo is used to store collation related information of a database.
*/
@ -75,6 +82,7 @@ static ObjectAddress * GetDatabaseAddressFromDatabaseName(char *databaseName,
bool missingOk);
static Oid get_database_owner(Oid dbId);
static List * GetDatabaseCommentPropagationCommands(Oid databaseOid);
/* controlled via GUC */
@ -569,6 +577,28 @@ GetTablespaceName(Oid tablespaceOid)
}
/*
* GetDatabaseMetadataSyncCommands returns a list of sql statements
* for the given database id. The list contains the database ddl command,
* grant commands and comment propagation commands.
*/
List *
GetDatabaseMetadataSyncCommands(Oid dbOid)
{
char *databaseDDLCommand = CreateDatabaseDDLCommand(dbOid);
List *ddlCommands = list_make1(databaseDDLCommand);
List *grantDDLCommands = GrantOnDatabaseDDLCommands(dbOid);
List *commentDDLCommands = GetDatabaseCommentPropagationCommands(dbOid);
ddlCommands = list_concat(ddlCommands, grantDDLCommands);
ddlCommands = list_concat(ddlCommands, commentDDLCommands);
return ddlCommands;
}
/*
* GetDatabaseCollation gets oid of a database and returns all the collation related information
* We need this method since collation related info in Form_pg_database is not accessible.
@ -788,3 +818,48 @@ DatabaseCommentObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
*objectAddressCopy = objectAddress;
return list_make1(objectAddressCopy);
}
static List *
GetDatabaseCommentPropagationCommands(Oid databaseOid)
{
Relation shdescRelation;
SysScanDesc scan;
HeapTuple tuple;
List *commands = NIL;
/* Open pg_shdescription catalog */
shdescRelation = table_open(SharedDescriptionRelationId, AccessShareLock);
/* Scan the table */
scan = systable_beginscan(shdescRelation, InvalidOid, false, NULL, 0, NULL);
while ((tuple = systable_getnext(scan)) != NULL)
{
Form_pg_shdescription shdesc = (Form_pg_shdescription) GETSTRUCT(tuple);
bool isNull = false;
TupleDesc tupdesc = RelationGetDescr(shdescRelation);
Datum descDatum = heap_getattr(tuple, Anum_pg_shdescription_description, tupdesc,
&isNull);
/* Check if the objoid matches the databaseOid */
if (shdesc->objoid == databaseOid)
{
/* Create the SQL command to propagate the comment to other nodes */
char *databaseName = get_database_name(databaseOid);
char *command = psprintf("COMMENT ON DATABASE %s IS '%s';", databaseName,
TextDatumGetCString(descDatum));
/* Add the command to the list */
commands = lappend(commands, command);
}
}
/* End the scan and close the catalog */
systable_endscan(scan);
table_close(shdescRelation, AccessShareLock);
return commands;
}

View File

@ -478,15 +478,7 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
*/
if (dependency->objectId != MyDatabaseId && EnableCreateDatabasePropagation)
{
char *databaseDDLCommand = CreateDatabaseDDLCommand(dependency->objectId);
List *ddlCommands = list_make1(databaseDDLCommand);
List *grantDDLCommands = GrantOnDatabaseDDLCommands(dependency->objectId);
ddlCommands = list_concat(ddlCommands, grantDDLCommands);
return ddlCommands;
return GetDatabaseMetadataSyncCommands(dependency->objectId);
}
return NIL;

View File

@ -230,6 +230,7 @@ extern List * PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * GetDatabaseMetadataSyncCommands(Oid dbOid);
extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,