Adds metadata sync for db and role

pull/7388/head
gurkanindibay 2023-12-26 12:35:15 +03:00
parent db5f04c4e3
commit dce02a49b0
9 changed files with 292 additions and 49 deletions

View File

@ -0,0 +1,111 @@
#include "postgres.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "catalog/pg_shdescription.h"
#include "server/nodes/parsenodes.h"
#include "server/access/table.h"
#include "server/utils/rel.h"
#include "utils/builtins.h"
#include "distributed/comment.h"
const CommentStmtType commentStmtTypes[] = {
{ OBJECT_DATABASE, "DATABASE" },
{ OBJECT_ROLE, "ROLE" }
};
static char * GetCommentObjectType(ObjectType objectType);
static char * GetCommentForObject(Oid oid);
List *
GetCommentPropagationCommands(Oid oid,char *objectName, ObjectType objectType)
{
List *commands = NIL;
StringInfo commentStmt = makeStringInfo();
/* Get the comment for the database */
char *comment = GetCommentForObject(oid);
char *commentObjectType = GetCommentObjectType(objectType);
/* Create the SQL command to propagate the comment to other nodes */
if (comment == NULL)
{
appendStringInfo(commentStmt, "COMMENT ON %s %s IS NULL;", commentObjectType, quote_identifier(objectName));
}
else
{
appendStringInfo(commentStmt, "COMMENT ON %s %s IS %s;", commentObjectType, quote_identifier(objectName),
quote_literal_cstr(comment));
}
/* Add the command to the list */
commands = list_make1(commentStmt->data);
return commands;
}
static char * GetCommentObjectType(ObjectType objectType){
char *objectName = NULL;
for(int i = 0; i < sizeof(commentStmtTypes)/sizeof(CommentStmtType); i++){
if(commentStmtTypes[i].objectType == objectType){
objectName = commentStmtTypes[i].objectName;
break;
}
}
return objectName;
}
static char *
GetCommentForObject(Oid oid)
{
Relation shdescRelation;
SysScanDesc scan;
HeapTuple tuple;
char *comment = NULL;
/* Open pg_shdescription catalog */
shdescRelation = table_open(SharedDescriptionRelationId, AccessShareLock);
/* Scan the table */
scan = systable_beginscan(shdescRelation, InvalidOid, false, NULL, 0, NULL);
while ((tuple = systable_getnext(scan)) != NULL)
{
Form_pg_shdescription shdesc = (Form_pg_shdescription) GETSTRUCT(tuple);
bool isNull = false;
TupleDesc tupdesc = RelationGetDescr(shdescRelation);
Datum descDatum = heap_getattr(tuple, Anum_pg_shdescription_description, tupdesc,
&isNull);
/* Check if the objoid matches the databaseOid */
if (shdesc->objoid == oid)
{
/* Add the command to the list */
if (!isNull)
{
comment = TextDatumGetCString(descDatum);
}
else
{
comment = NULL;
}
break;
}
}
/* End the scan and close the catalog */
systable_endscan(scan);
table_close(shdescRelation, AccessShareLock);
return comment;
}

View File

@ -33,6 +33,7 @@
#include "distributed/adaptive_executor.h"
#include "distributed/commands.h"
#include "distributed/comment.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/deparser.h"
@ -48,7 +49,6 @@
#include "access/htup_details.h"
#include "access/table.h"
#include "catalog/pg_shdescription.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@ -70,6 +70,7 @@ typedef struct DatabaseCollationInfo
#endif
} DatabaseCollationInfo;
static char * GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database
databaseForm);
static DatabaseCollationInfo GetDatabaseCollation(Oid dbOid);
@ -82,7 +83,6 @@ static ObjectAddress * GetDatabaseAddressFromDatabaseName(char *databaseName,
bool missingOk);
static Oid get_database_owner(Oid dbId);
static List * GetDatabaseCommentPropagationCommands(Oid databaseOid);
/* controlled via GUC */
@ -585,12 +585,14 @@ GetTablespaceName(Oid tablespaceOid)
List *
GetDatabaseMetadataSyncCommands(Oid dbOid)
{
char *databaseName = get_database_name(dbOid);
char *databaseDDLCommand = CreateDatabaseDDLCommand(dbOid);
List *ddlCommands = list_make1(databaseDDLCommand);
List *grantDDLCommands = GrantOnDatabaseDDLCommands(dbOid);
List *commentDDLCommands = GetDatabaseCommentPropagationCommands(dbOid);
List *commentDDLCommands = GetCommentPropagationCommands(dbOid, databaseName,
OBJECT_DATABASE);
ddlCommands = list_concat(ddlCommands, grantDDLCommands);
ddlCommands = list_concat(ddlCommands, commentDDLCommands);
@ -818,48 +820,3 @@ DatabaseCommentObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
*objectAddressCopy = objectAddress;
return list_make1(objectAddressCopy);
}
static List *
GetDatabaseCommentPropagationCommands(Oid databaseOid)
{
Relation shdescRelation;
SysScanDesc scan;
HeapTuple tuple;
List *commands = NIL;
/* Open pg_shdescription catalog */
shdescRelation = table_open(SharedDescriptionRelationId, AccessShareLock);
/* Scan the table */
scan = systable_beginscan(shdescRelation, InvalidOid, false, NULL, 0, NULL);
while ((tuple = systable_getnext(scan)) != NULL)
{
Form_pg_shdescription shdesc = (Form_pg_shdescription) GETSTRUCT(tuple);
bool isNull = false;
TupleDesc tupdesc = RelationGetDescr(shdescRelation);
Datum descDatum = heap_getattr(tuple, Anum_pg_shdescription_description, tupdesc,
&isNull);
/* Check if the objoid matches the databaseOid */
if (shdesc->objoid == databaseOid)
{
/* Create the SQL command to propagate the comment to other nodes */
char *databaseName = get_database_name(databaseOid);
char *command = psprintf("COMMENT ON DATABASE %s IS '%s';", databaseName,
TextDatumGetCString(descDatum));
/* Add the command to the list */
commands = lappend(commands, command);
}
}
/* End the scan and close the catalog */
systable_endscan(scan);
table_close(shdescRelation, AccessShareLock);
return commands;
}

View File

@ -44,6 +44,7 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/commands.h"
#include "distributed/comment.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/deparser.h"
@ -582,6 +583,17 @@ GenerateCreateOrAlterRoleCommand(Oid roleOid)
{
completeRoleList = lappend(completeRoleList, DeparseTreeNode(stmt));
}
/*
* append COMMENT ON ROLE commands for this specific user
* When we propagate user creation, we also want to make sure that we propagate
* all the comments it has been given. For this, we check pg_shdescription
* for the ROLE entry corresponding to roleOid, and generate the relevant
* Comment stmts to be run in the new node.
*/
List *commentStmts = GetCommentPropagationCommands(roleOid, rolename,
OBJECT_ROLE);
completeRoleList = list_concat(completeRoleList, commentStmts);
}
return completeRoleList;

View File

@ -0,0 +1,12 @@
#include "postgres.h"
#include "server/nodes/parsenodes.h"
typedef struct CommentStmtType
{
ObjectType objectType;
char *objectName;
} CommentStmtType;
extern List * GetCommentPropagationCommands(Oid oid, char *objectName, ObjectType objectType);

View File

@ -48,6 +48,51 @@ SELECT result FROM run_command_on_all_nodes(
(3 rows)
drop DATABASE "test1-\!escape";
--test metadata sync
select 1 from citus_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
create database "test1-\!escape";
comment on DATABASE "test1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
result
---------------------------------------------------------------------
test-comment
test-comment
(2 rows)
select 1 from citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
result
---------------------------------------------------------------------
test-comment
test-comment
test-comment
(3 rows)
drop DATABASE "test1-\!escape";

View File

@ -47,6 +47,51 @@ SELECT result FROM run_command_on_all_nodes(
(3 rows)
drop role "role1-\!escape";
--test metadata sync
select 1 from citus_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
create role "role1-\!escape";
comment on ROLE "role1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
result
---------------------------------------------------------------------
test-comment
test-comment
(2 rows)
select 1 from citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
result
---------------------------------------------------------------------
test-comment
test-comment
test-comment
(3 rows)
drop role "role1-\!escape";

View File

@ -38,6 +38,8 @@ test: create_single_shard_table
test: create_drop_database_propagation
test: create_drop_database_propagation_pg15
test: create_drop_database_propagation_pg16
test: comment_on_database
test: comment_on_role
# don't parallelize single_shard_table_udfs to make sure colocation ids are sequential
test: single_shard_table_udfs
test: schema_based_sharding
@ -59,7 +61,7 @@ test: grant_on_database_propagation
test: alter_database_propagation
test: citus_shards
test: comment_on_database comment_on_role
# ----------
# multi_citus_tools tests utility functions written for citus tools

View File

@ -39,6 +39,35 @@ SELECT result FROM run_command_on_all_nodes(
);
drop DATABASE "test1-\!escape";
--test metadata sync
select 1 from citus_remove_node('localhost', :worker_2_port);
create database "test1-\!escape";
comment on DATABASE "test1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
select 1 from citus_add_node('localhost', :worker_2_port);
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS database_comment
FROM pg_database d
LEFT JOIN pg_shdescription ds ON d.oid = ds.objoid
WHERE d.datname = 'test1-\!escape';
$$
);
drop DATABASE "test1-\!escape";
reset citus.enable_create_database_propagation;
reset citus.grep_remote_commands;
reset citus.log_remote_commands;

View File

@ -38,5 +38,35 @@ SELECT result FROM run_command_on_all_nodes(
);
drop role "role1-\!escape";
--test metadata sync
select 1 from citus_remove_node('localhost', :worker_2_port);
create role "role1-\!escape";
comment on ROLE "role1-\!escape" is 'test-comment';
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
select 1 from citus_add_node('localhost', :worker_2_port);
SELECT result FROM run_command_on_all_nodes(
$$
SELECT ds.description AS role_comment
FROM pg_roles r
LEFT JOIN pg_shdescription ds ON r.oid = ds.objoid
WHERE r.rolname = 'role1-\!escape';
$$
);
drop role "role1-\!escape";
reset citus.grep_remote_commands;
reset citus.log_remote_commands;