Merge branch 'grant_database_2pc' into create_user_2pc

pull/7461/head
Gürkan İndibay 2024-01-29 23:32:37 +03:00 committed by GitHub
commit e7b389f420
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 114 additions and 49 deletions

View File

@ -4,7 +4,22 @@ set -euo pipefail
# shellcheck disable=SC1091 # shellcheck disable=SC1091
source ci/ci_helpers.sh source ci/ci_helpers.sh
# Find the line that exactly matches "RegisterCitusConfigVariables(void)" in
# shared_library_init.c. grep command returns something like
# "934:RegisterCitusConfigVariables(void)" and we extract the line number
# with cut.
RegisterCitusConfigVariables_begin_linenumber=$(grep -n "^RegisterCitusConfigVariables(void)$" src/backend/distributed/shared_library_init.c | cut -d: -f1)
# Consider the lines starting from $RegisterCitusConfigVariables_begin_linenumber,
# grep the first line that starts with "}" and extract the line number with cut
# as in the previous step.
RegisterCitusConfigVariables_length=$(tail -n +$RegisterCitusConfigVariables_begin_linenumber src/backend/distributed/shared_library_init.c | grep -n -m 1 "^}$" | cut -d: -f1)
# extract the function definition of RegisterCitusConfigVariables into a temp file
tail -n +$RegisterCitusConfigVariables_begin_linenumber src/backend/distributed/shared_library_init.c | head -n $(($RegisterCitusConfigVariables_length)) > RegisterCitusConfigVariables_func_def.out
# extract citus gucs in the form of <tab><tab>"citus.X" # extract citus gucs in the form of <tab><tab>"citus.X"
grep -P "^[\t][\t]\"citus\.[a-zA-Z_0-9]+\"" src/backend/distributed/shared_library_init.c > gucs.out grep -P "^[\t][\t]\"citus\.[a-zA-Z_0-9]+\"" RegisterCitusConfigVariables_func_def.out > gucs.out
sort -c gucs.out sort -c gucs.out
rm gucs.out rm gucs.out
rm RegisterCitusConfigVariables_func_def.out

View File

@ -96,25 +96,34 @@
"SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)" "SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)"
/* /*
* TwoPcStatementInfo is used to determine whether a statement is supported in 2PC * NonMainDbDistributedStatementInfo is used to determine whether a statement is
* and whether it should be marked as distributed in 2PC. * supported from non-main databases and whether it should be marked as
* distributed explicitly (*).
*
* We always have to mark such the objects created "as distributed" but while for
* some object types we can delegate this to main database, for some others we have
* to explicitly send a command to all nodes in this code-path to achieve this.
*/ */
typedef struct TwoPcStatementInfo typedef struct NonMainDbDistributedStatementInfo
{ {
int statementType; int statementType;
bool explicitlyMarkAsDistributed;
ObjectType *supportedObjectTypes; ObjectType *supportedObjectTypes;
int supportedObjectTypesSize; int supportedObjectTypesSize;
bool markAsDistributed; } NonMainDbDistributedStatementInfo;
} TwoPcStatementInfo;
typedef struct ObjectInfo
{
char *name;
Oid id;
} ObjectInfo;
/* /*
* twoPcSupportedStatements is a list of statements that are supported in 2PC. * NonMainDbSupportedStatements is an array of statements that are supported
* The list is used to determine whether a statement is supported in 2PC and * from non-main databases.
* whether it should be marked as distributed in 2PC.
* We use this array to avoid hardcoding the list of supported statements in
* multiple places.
*/ */
ObjectType supportedObjectTypesForGrantStmt[] = { OBJECT_DATABASE }; ObjectType supportedObjectTypesForGrantStmt[] = { OBJECT_DATABASE };
TwoPcStatementInfo twoPcSupportedStatements[] = { TwoPcStatementInfo twoPcSupportedStatements[] = {
{ T_GrantRoleStmt, NULL, 0, false }, { T_GrantRoleStmt, NULL, 0, false },
{ T_CreateRoleStmt, NULL, 0, true }, { T_CreateRoleStmt, NULL, 0, true },
@ -122,6 +131,14 @@ TwoPcStatementInfo twoPcSupportedStatements[] = {
{ T_AlterRoleStmt, NULL, 0, false }, { T_AlterRoleStmt, NULL, 0, false },
{ T_GrantStmt, supportedObjectTypesForGrantStmt, { T_GrantStmt, supportedObjectTypesForGrantStmt,
sizeof(supportedObjectTypesForGrantStmt) / sizeof(ObjectType), true } sizeof(supportedObjectTypesForGrantStmt) / sizeof(ObjectType), true }
static const NonMainDbDistributedStatementInfo NonMainDbSupportedStatements[] = {
{ T_GrantRoleStmt, false, NULL, 0 },
{ T_CreateRoleStmt, true, NULL, 0 },
{ T_DropRoleStmt, true,NULL, 0 },
{ T_AlterRoleStmt, false, NULL, 0 },
{ T_GrantStmt, false, supportedObjectTypesForGrantStmt,
sizeof(supportedObjectTypesForGrantStmt) / sizeof(ObjectType) }
}; };
@ -154,10 +171,12 @@ static bool IsDropSchemaOrDB(Node *parsetree);
static bool ShouldCheckUndistributeCitusLocalTables(void); static bool ShouldCheckUndistributeCitusLocalTables(void);
static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString); static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString);
static void RunPostprocessMainDBCommand(Node *parsetree); static void RunPostprocessMainDBCommand(Node *parsetree);
static bool IsStatementSupportedIn2PC(Node *parsetree); static bool IsObjectTypeSupported(Node *parsetree, NonMainDbDistributedStatementInfo
static bool DoesStatementRequireMarkDistributedFor2PC(Node *parsetree); nonMainDbDistributedStatementInfo);
static bool IsObjectTypeSupported(Node *parsetree, TwoPcStatementInfo static bool IsStatementSupportedInNonMainDb(Node *parsetree);
twoPcSupportedStatement); static bool StatementRequiresMarkDistributedFromNonMainDb(Node *parsetree);
static ObjectInfo GetObjectInfo(Node *parsetree);
static void MarkObjectDistributedInNonMainDb(Node *parsetree, ObjectInfo objectInfo);
/* /*
* ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of * ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of
@ -1637,7 +1656,7 @@ DropSchemaOrDBInProgress(void)
static void static void
RunPreprocessMainDBCommand(Node *parsetree, const char *queryString) RunPreprocessMainDBCommand(Node *parsetree, const char *queryString)
{ {
if (!IsStatementSupportedIn2PC(parsetree)) if (!IsStatementSupportedInNonMainDb(parsetree))
{ {
return; return;
} }
@ -1663,42 +1682,69 @@ RunPreprocessMainDBCommand(Node *parsetree, const char *queryString)
static void static void
RunPostprocessMainDBCommand(Node *parsetree) RunPostprocessMainDBCommand(Node *parsetree)
{ {
if (!IsStatementSupportedIn2PC(parsetree) || if (IsStatementSupportedInNonMainDb(parsetree) &&
!DoesStatementRequireMarkDistributedFor2PC(parsetree)) StatementRequiresMarkDistributedFromNonMainDb(parsetree))
{ {
return; ObjectInfo objectInfo = GetObjectInfo(parsetree);
} MarkObjectDistributedInNonMainDb(parsetree, objectInfo);
if (IsA(parsetree, CreateRoleStmt))
{
StringInfo mainDBQuery = makeStringInfo();
CreateRoleStmt *createRoleStmt = castNode(CreateRoleStmt, parsetree);
Oid roleOid = get_role_oid(createRoleStmt->role, false);
appendStringInfo(mainDBQuery,
MARK_OBJECT_DISTRIBUTED,
AuthIdRelationId,
quote_literal_cstr(createRoleStmt->role),
roleOid,
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
} }
} }
/* /*
* IsStatementSupportedIn2Pc returns true if the statement is supported in 2pc * GetObjectInfo returns the name and oid of the object in the given parsetree.
*/
static ObjectInfo
GetObjectInfo(Node *parsetree)
{
ObjectInfo info;
if (IsA(parsetree, CreateRoleStmt))
{
CreateRoleStmt *stmt = castNode(CreateRoleStmt, parsetree);
info.name = stmt->role;
info.id = get_role_oid(stmt->role, false);
}
/* Add else if branches for other statement types */
return info;
}
/*
* MarkObjectDistributedInNonMainDb marks the given object as distributed on the
* non-main database.
*/
static void
MarkObjectDistributedInNonMainDb(Node *parsetree, ObjectInfo objectInfo)
{
StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
MARK_OBJECT_DISTRIBUTED,
AuthIdRelationId,
quote_literal_cstr(objectInfo.name),
objectInfo.id,
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
}
/*
* IsStatementSupportedIn2Pc returns true if the statement is supported from a
* non-main database.
*/ */
static bool static bool
IsStatementSupportedIn2PC(Node *parsetree) IsStatementSupportedInNonMainDb(Node *parsetree)
{ {
NodeTag type = nodeTag(parsetree); NodeTag type = nodeTag(parsetree);
for (int i = 0; i < sizeof(twoPcSupportedStatements) / for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
sizeof(twoPcSupportedStatements[0]); i++) sizeof(NonMainDbSupportedStatements[0]); i++)
{ {
if (type == twoPcSupportedStatements[i].statementType) if (type == NonMainDbSupportedStatements[i].statementType)
{ {
if (twoPcSupportedStatements[i].supportedObjectTypes == NULL) if (NonMainDbSupportedStatements[i].supportedObjectTypes == NULL)
{ {
return true; return true;
} }
@ -1706,7 +1752,8 @@ IsStatementSupportedIn2PC(Node *parsetree)
{ {
if (type == T_GrantStmt) if (type == T_GrantStmt)
{ {
return IsObjectTypeSupported(parsetree, twoPcSupportedStatements[i]); return IsObjectTypeSupported(parsetree,
NonMainDbSupportedStatements[i]);
} }
} }
} }
@ -1720,7 +1767,8 @@ IsStatementSupportedIn2PC(Node *parsetree)
* IsObjectTypeSupported returns true if the object type is supported in 2pc * IsObjectTypeSupported returns true if the object type is supported in 2pc
*/ */
bool bool
IsObjectTypeSupported(Node *parsetree, TwoPcStatementInfo twoPcSupportedStatement) IsObjectTypeSupported(Node *parsetree, NonMainDbDistributedStatementInfo
nonMainDbDistributedStatementInfo)
{ {
NodeTag type = nodeTag(parsetree); NodeTag type = nodeTag(parsetree);
if (type == T_GrantStmt) if (type == T_GrantStmt)
@ -1728,9 +1776,11 @@ IsObjectTypeSupported(Node *parsetree, TwoPcStatementInfo twoPcSupportedStatemen
GrantStmt *stmt = castNode(GrantStmt, parsetree); GrantStmt *stmt = castNode(GrantStmt, parsetree);
/* check if stmt->objtype is in supportedObjectTypes */ /* check if stmt->objtype is in supportedObjectTypes */
for (int j = 0; j < twoPcSupportedStatement.supportedObjectTypesSize; j++) for (int j = 0; j < nonMainDbDistributedStatementInfo.supportedObjectTypesSize;
j++)
{ {
if (stmt->objtype == twoPcSupportedStatement.supportedObjectTypes[j]) if (stmt->objtype ==
nonMainDbDistributedStatementInfo.supportedObjectTypes[j])
{ {
return true; return true;
} }
@ -1743,19 +1793,19 @@ IsObjectTypeSupported(Node *parsetree, TwoPcStatementInfo twoPcSupportedStatemen
/* /*
* DoesStatementRequireMarkDistributedFor2PC returns true if the statement should be marked * DoesStatementRequireMarkDistributedFor2PC returns true if the statement should be marked
* as distributed in 2pc * as distributed when executed from a non-main database.
*/ */
static bool static bool
DoesStatementRequireMarkDistributedFor2PC(Node *parsetree) StatementRequiresMarkDistributedFromNonMainDb(Node *parsetree)
{ {
NodeTag type = nodeTag(parsetree); NodeTag type = nodeTag(parsetree);
for (int i = 0; i < sizeof(twoPcSupportedStatements) / for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
sizeof(twoPcSupportedStatements[0]); i++) sizeof(NonMainDbSupportedStatements[0]); i++)
{ {
if (type == twoPcSupportedStatements[i].statementType) if (type == NonMainDbSupportedStatements[i].statementType)
{ {
return twoPcSupportedStatements[i].markAsDistributed; return NonMainDbSupportedStatements[i].explicitlyMarkAsDistributed;
} }
} }