phase4: improve the infra and add nice comments

non-main-redesign-suggest
Onur Tirtir 2024-02-29 00:50:42 +03:00
parent 3bdecb10a0
commit cd20ab76c0
1 changed files with 282 additions and 281 deletions

View File

@ -5,6 +5,15 @@
* Routines to support node-wide object management commands from non-main * Routines to support node-wide object management commands from non-main
* databases. * databases.
* *
* RunPreprocessNonMainDBCommand and RunPostprocessNonMainDBCommand are
* the entrypoints for this module. These functions are called from
* utility_hook.c to support some of the node-wide object management
* commands from non-main databases.
*
* To add support for a new command type, one needs to define a new
* NonMainDbDistributeObjectOps object and add it to
* GetNonMainDbDistributeObjectOps.
*
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -33,99 +42,130 @@
"SELECT pg_catalog.citus_unmark_object_distributed(%d, %d, %d, %s)" "SELECT pg_catalog.citus_unmark_object_distributed(%d, %d, %d, %s)"
/* see NonMainDbDistributedStatementInfo for the explanation of these flags */
typedef enum DistObjectOperation
{
NO_DIST_OBJECT_OPERATION,
MARK_DISTRIBUTED_GLOBALLY,
UNMARK_DISTRIBUTED_LOCALLY
} DistObjectOperation;
/* /*
* NonMainDbDistributedStatementInfo is used to determine whether a statement is * Structs used to implement getMarkDistributedParams and
* supported from non-main databases and whether it should be marked or unmarked * getUnmarkDistributedParams callbacks for NonMainDbDistributeObjectOps.
* as distributed.
* *
* When creating a distributed object, we always have to mark such objects as * Main difference between these two is that while
* "distributed" but while for some object types we can delegate this to main * MarkDistributedGloballyParams contains the name of the object, the other
* database, for some others we have to explicitly send a command to all nodes * doesn't. This is because, when marking an object -that is created from a
* in this code-path to achieve this. Callers need to provide * non-main db- as distributed, citus_internal.mark_object_distributed()
* MARK_DISTRIBUTED_GLOBALLY when that is the case. * cannot find its name since the object is not visible to outer transaction
* * (or, read as "the transaction in non-main db").
* Similarly when dropping a distributed object, we always have to unmark such
* objects as "distributed" and our utility hook on remote nodes achieve this
* via UnmarkNodeWideObjectsDistributed() because the commands that we send to
* workers are executed via main db. However for the local node, this is not the
* case as we're not in the main db. For this reason, callers need to provide
* UNMARK_DISTRIBUTED_LOCALLY to unmark an object for local node as well.
*/ */
typedef struct NonMainDbDistributedStatementInfo typedef struct MarkDistributedGloballyParams
{
int statementType;
DistObjectOperation distObjectOperation;
/*
* checkSupportedObjectTypes is a callback function that checks whether
* type of the object referred to by given statement is supported.
*
* Can be NULL if not applicable for the statement type.
*/
bool (*checkSupportedObjectTypes)(Node *node);
/* indicates whether the statement cannot be executed in a transaction */
bool cannotBeExecutedInTransaction;
} NonMainDbDistributedStatementInfo;
/*
* DistObjectOperationParams is used to pass parameters to the
* MarkObjectDistributedGloballyFromNonMainDb function and
* UnMarkObjectDistributedLocallyFromNonMainDb functions.
*/
typedef struct DistObjectOperationParams
{ {
char *name; char *name;
Oid id; Oid id;
uint16 catalogRelId; uint16 catalogRelId;
} DistObjectOperationParams; } MarkDistributedGloballyParams;
typedef struct UnmarkDistributedLocallyParams
{
Oid id;
uint16 catalogRelId;
} UnmarkDistributedLocallyParams;
/*
* NonMainDbDistributeObjectOps contains the necessary callbacks / flags to
* support node-wide object management commands from non-main databases.
*
* getMarkDistributedParams / getUnmarkDistributedParams:
* When creating a distributed object, we always have to mark such objects
* as "distributed" but while for some object types we can delegate this to
* main database, for some others we have to explicitly send a command to
* all nodes in this code-path to achieve this. Callers need to implement
* getMarkDistributedParams when that is the case.
*
* Similarly when dropping a distributed object, we always have to unmark
* the target object as "distributed" and our utility hook on remote nodes
* achieve this via UnmarkNodeWideObjectsDistributed() because the commands
* that we send to workers are executed via main db. However for the local
* node, this is not the case as we're not in the main db. For this reason,
* callers need to implement getUnmarkDistributedParams to unmark an object
* for local node as well.
*
* We don't expect both of these to be NULL at the same time. However, it's
* okay if both of these are NULL.
*
* Finally, while getMarkDistributedParams is expected to return "a list of
* objects", this is not the case for getUnmarkDistributedParams. This is
* because, while we expect that a drop command might drop multiple objects
* at once, we don't expect a create command to create multiple objects at
* once.
*
* cannotBeExecutedInTransaction:
* Indicates whether the statement cannot be executed in a transaction. If
* this is set to true, the statement will be executed directly on the main
* database because there are no transactional visibility issues for such
* commands.
*
* And when this is true, we don't expect getMarkDistributedParams and
* getUnmarkDistributedParams to be implemented.
*/
typedef struct NonMainDbDistributeObjectOps
{
MarkDistributedGloballyParams * (*getMarkDistributedParams)(Node * parsetree);
List * (*getUnmarkDistributedParams)(Node * parsetree);
bool cannotBeExecutedInTransaction;
} NonMainDbDistributeObjectOps;
/* /*
* checkSupportedObjectTypes callbacks for * getMarkDistributedParams and getUnmarkDistributedParams callbacks for
* NonMainDbDistributedStatementInfo objects. * NonMainDbDistributeObjectOps.
*/ */
static bool NonMainDbCheckSupportedObjectTypeForCreateDatabase(Node *node); static MarkDistributedGloballyParams * CreateRoleStmtGetMarkDistributedParams(
static bool NonMainDbCheckSupportedObjectTypeForDropDatabase(Node *node); Node *parsetree);
static bool NonMainDbCheckSupportedObjectTypeForGrant(Node *node); static List * DropRoleStmtGetUnmarkDistributedParams(Node *parsetree);
static bool NonMainDbCheckSupportedObjectTypeForSecLabel(Node *node);
/*
* NonMainDbSupportedStatements is an array of statements that are supported /* NonMainDbDistributeObjectOps for different command types */
* from non-main databases. static NonMainDbDistributeObjectOps Any_CreateRole = {
*/ .getMarkDistributedParams = CreateRoleStmtGetMarkDistributedParams,
ObjectType supportedObjectTypesForGrantStmt[] = { OBJECT_DATABASE }; .getUnmarkDistributedParams = NULL,
static const NonMainDbDistributedStatementInfo NonMainDbSupportedStatements[] = { .cannotBeExecutedInTransaction = false
{ T_GrantRoleStmt, NO_DIST_OBJECT_OPERATION, NULL, false }, };
{ T_CreateRoleStmt, MARK_DISTRIBUTED_GLOBALLY, NULL, false }, static NonMainDbDistributeObjectOps Any_DropRole = {
{ T_DropRoleStmt, UNMARK_DISTRIBUTED_LOCALLY, NULL, false }, .getMarkDistributedParams = NULL,
{ T_AlterRoleStmt, NO_DIST_OBJECT_OPERATION, NULL, false }, .getUnmarkDistributedParams = DropRoleStmtGetUnmarkDistributedParams,
{ T_GrantStmt, NO_DIST_OBJECT_OPERATION, .cannotBeExecutedInTransaction = false
NonMainDbCheckSupportedObjectTypeForGrant, false }, };
{ T_CreatedbStmt, NO_DIST_OBJECT_OPERATION, static NonMainDbDistributeObjectOps Any_AlterRole = {
NonMainDbCheckSupportedObjectTypeForCreateDatabase, true }, .getMarkDistributedParams = NULL,
{ T_DropdbStmt, NO_DIST_OBJECT_OPERATION, .getUnmarkDistributedParams = NULL,
NonMainDbCheckSupportedObjectTypeForDropDatabase, true }, .cannotBeExecutedInTransaction = false
{ T_SecLabelStmt, NO_DIST_OBJECT_OPERATION, };
NonMainDbCheckSupportedObjectTypeForSecLabel, false }, static NonMainDbDistributeObjectOps Any_GrantRole = {
.cannotBeExecutedInTransaction = false
};
static NonMainDbDistributeObjectOps Database_Create = {
.getMarkDistributedParams = NULL,
.getUnmarkDistributedParams = NULL,
.cannotBeExecutedInTransaction = true
};
static NonMainDbDistributeObjectOps Database_Drop = {
.getMarkDistributedParams = NULL,
.getUnmarkDistributedParams = NULL,
.cannotBeExecutedInTransaction = true
};
static NonMainDbDistributeObjectOps Database_Grant = {
.getMarkDistributedParams = NULL,
.getUnmarkDistributedParams = NULL,
.cannotBeExecutedInTransaction = false
};
static NonMainDbDistributeObjectOps Role_SecLabel = {
.getMarkDistributedParams = NULL,
.getUnmarkDistributedParams = NULL,
.cannotBeExecutedInTransaction = false
}; };
static bool IsStatementSupportedFromNonMainDb(Node *parsetree); /* other static function declarations */
static bool StatementRequiresMarkDistributedGloballyFromNonMainDb(Node *parsetree); const NonMainDbDistributeObjectOps * GetNonMainDbDistributeObjectOps(Node *parsetree);
static bool StatementRequiresUnmarkDistributedLocallyFromNonMainDb(Node *parsetree); static void MarkObjectDistributedGloballyOnMainDbs(
static bool StatementCannotBeExecutedInTransaction(Node *parsetree); MarkDistributedGloballyParams *markDistributedParams);
static void MarkObjectDistributedGloballyFromNonMainDb(Node *parsetree); static void UnmarkObjectDistributedOnLocalMainDb(List *unmarkDistributedList);
static void UnMarkObjectDistributedLocallyFromNonMainDb(List *unmarkDistributedList);
static List * GetDistObjectOperationParams(Node *parsetree);
/* /*
@ -138,7 +178,13 @@ static List * GetDistObjectOperationParams(Node *parsetree);
bool bool
RunPreprocessNonMainDBCommand(Node *parsetree) RunPreprocessNonMainDBCommand(Node *parsetree)
{ {
if (IsMainDB || !IsStatementSupportedFromNonMainDb(parsetree)) if (IsMainDB)
{
return false;
}
const NonMainDbDistributeObjectOps *ops = GetNonMainDbDistributeObjectOps(parsetree);
if (!ops)
{ {
return false; return false;
} }
@ -148,9 +194,9 @@ RunPreprocessNonMainDBCommand(Node *parsetree)
/* /*
* For the commands that cannot be executed in a transaction, there are no * For the commands that cannot be executed in a transaction, there are no
* transactional visibility issues. We directly route them to main database * transactional visibility issues. We directly route them to main database
* so that we only have to consider one code-path when creating databases. * so that we only have to consider one code-path for such commands.
*/ */
if (StatementCannotBeExecutedInTransaction(parsetree)) if (ops->cannotBeExecutedInTransaction)
{ {
IsMainDBCommandInXact = false; IsMainDBCommandInXact = false;
RunCitusMainDBQuery((char *) queryString); RunCitusMainDBQuery((char *) queryString);
@ -172,10 +218,10 @@ RunPreprocessNonMainDBCommand(Node *parsetree)
quote_literal_cstr(CurrentUserName())); quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data); RunCitusMainDBQuery(mainDBQuery->data);
if (StatementRequiresUnmarkDistributedLocallyFromNonMainDb(parsetree)) if (ops->getUnmarkDistributedParams)
{ {
List *unmarkParams = GetDistObjectOperationParams(parsetree); List *unmarkDistributedParamsList = ops->getUnmarkDistributedParams(parsetree);
UnMarkObjectDistributedLocallyFromNonMainDb(unmarkParams); UnmarkObjectDistributedOnLocalMainDb(unmarkDistributedParamsList);
} }
return false; return false;
@ -189,154 +235,165 @@ RunPreprocessNonMainDBCommand(Node *parsetree)
void void
RunPostprocessNonMainDBCommand(Node *parsetree) RunPostprocessNonMainDBCommand(Node *parsetree)
{ {
if (IsMainDB || !IsStatementSupportedFromNonMainDb(parsetree)) if (IsMainDB)
{ {
return; return;
} }
if (StatementRequiresMarkDistributedGloballyFromNonMainDb(parsetree)) const NonMainDbDistributeObjectOps *ops = GetNonMainDbDistributeObjectOps(parsetree);
if (!ops)
{ {
MarkObjectDistributedGloballyFromNonMainDb(parsetree); return;
}
if (ops->getMarkDistributedParams)
{
MarkDistributedGloballyParams *markDistributedParams =
ops->getMarkDistributedParams(parsetree);
MarkObjectDistributedGloballyOnMainDbs(markDistributedParams);
} }
} }
/* /*
* IsStatementSupportedFromNonMainDb returns true if the statement is supported from a * GetNonMainDbDistributeObjectOps returns the NonMainDbDistributeObjectOps for given
* non-main database. * command if it's node-wide object management command that's supported from non-main
* databases.
*/ */
static bool const NonMainDbDistributeObjectOps *
IsStatementSupportedFromNonMainDb(Node *parsetree) GetNonMainDbDistributeObjectOps(Node *parsetree)
{ {
NodeTag type = nodeTag(parsetree); switch (nodeTag(parsetree))
for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
sizeof(NonMainDbSupportedStatements[0]); i++)
{ {
if (type != NonMainDbSupportedStatements[i].statementType) case T_CreateRoleStmt:
{ {
continue; return &Any_CreateRole;
} }
return !NonMainDbSupportedStatements[i].checkSupportedObjectTypes || case T_DropRoleStmt:
NonMainDbSupportedStatements[i].checkSupportedObjectTypes(parsetree); {
return &Any_DropRole;
} }
return false; case T_AlterRoleStmt:
{
return &Any_AlterRole;
} }
case T_GrantRoleStmt:
{
return &Any_GrantRole;
}
case T_CreatedbStmt:
{
CreatedbStmt *stmt = castNode(CreatedbStmt, parsetree);
/* /*
* StatementRequiresMarkDistributedGloballyFromNonMainDb returns true if the statement should be marked * We don't try to send the query to the main database if the CREATE
* as distributed when executed from a non-main database. * DATABASE command is for the main database itself, this is a very
* rare case but it's exercised by our test suite.
*/ */
static bool if (strcmp(stmt->dbname, MainDb) != 0)
StatementRequiresMarkDistributedGloballyFromNonMainDb(Node *parsetree)
{ {
NodeTag type = nodeTag(parsetree); return &Database_Create;
for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
sizeof(NonMainDbSupportedStatements[0]); i++)
{
if (type == NonMainDbSupportedStatements[i].statementType)
{
return NonMainDbSupportedStatements[i].distObjectOperation ==
MARK_DISTRIBUTED_GLOBALLY;
}
} }
return false; return NULL;
} }
case T_DropdbStmt:
{
DropdbStmt *stmt = castNode(DropdbStmt, parsetree);
/* /*
* StatementRequiresUnmarkDistributedLocallyFromNonMainDb returns true if the statement should be unmarked * We don't try to send the query to the main database if the DROP
* as distributed when executed from a non-main database. * DATABASE command is for the main database itself, this is a very
* rare case but it's exercised by our test suite.
*/ */
static bool if (strcmp(stmt->dbname, MainDb) != 0)
StatementRequiresUnmarkDistributedLocallyFromNonMainDb(Node *parsetree)
{ {
NodeTag type = nodeTag(parsetree); return &Database_Drop;
}
for (int i = 0; i < sizeof(NonMainDbSupportedStatements) / return NULL;
sizeof(NonMainDbSupportedStatements[0]); i++) }
case T_GrantStmt:
{ {
if (type == NonMainDbSupportedStatements[i].statementType) GrantStmt *stmt = castNode(GrantStmt, parsetree);
switch (stmt->objtype)
{ {
return NonMainDbSupportedStatements[i].distObjectOperation == case OBJECT_DATABASE:
UNMARK_DISTRIBUTED_LOCALLY; {
return &Database_Grant;
}
default:
return NULL;
} }
} }
return false; case T_SecLabelStmt:
{
SecLabelStmt *stmt = castNode(SecLabelStmt, parsetree);
switch (stmt->objtype)
{
case OBJECT_ROLE:
{
return &Role_SecLabel;
}
default:
return NULL;
}
}
default:
return NULL;
}
} }
/* /*
* StatementCannotBeExecutedInTransaction returns true if the statement cannot be executed in a * MarkObjectDistributedGloballyOnMainDbs marks an object as
* transaction. * distributed on all main databases globally.
*/
static bool
StatementCannotBeExecutedInTransaction(Node *parsetree)
{
NodeTag type = nodeTag(parsetree);
for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
sizeof(NonMainDbSupportedStatements[0]); i++)
{
if (type == NonMainDbSupportedStatements[i].statementType)
{
return NonMainDbSupportedStatements[i].cannotBeExecutedInTransaction;
}
}
return false;
}
/*
* MarkObjectDistributedGloballyFromNonMainDb marks the given object as distributed on the
* non-main database.
*/ */
static void static void
MarkObjectDistributedGloballyFromNonMainDb(Node *parsetree) MarkObjectDistributedGloballyOnMainDbs(
{ MarkDistributedGloballyParams *markDistributedParams)
List *distObjectOperationParams =
GetDistObjectOperationParams(parsetree);
DistObjectOperationParams *distObjectOperationParam = NULL;
foreach_ptr(distObjectOperationParam, distObjectOperationParams)
{ {
StringInfo mainDBQuery = makeStringInfo(); StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery, appendStringInfo(mainDBQuery,
MARK_OBJECT_DISTRIBUTED, MARK_OBJECT_DISTRIBUTED,
distObjectOperationParam->catalogRelId, markDistributedParams->catalogRelId,
quote_literal_cstr(distObjectOperationParam->name), quote_literal_cstr(markDistributedParams->name),
distObjectOperationParam->id, markDistributedParams->id,
quote_literal_cstr(CurrentUserName())); quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data); RunCitusMainDBQuery(mainDBQuery->data);
} }
}
/* /*
* UnMarkObjectDistributedLocallyFromNonMainDb unmarks the given object as distributed on the * UnmarkObjectDistributedOnLocalMainDb unmarks a list of objects as
* non-main database. * distributed on the local main database.
*/ */
static void static void
UnMarkObjectDistributedLocallyFromNonMainDb(List *markObjectDistributedParamList) UnmarkObjectDistributedOnLocalMainDb(List *unmarkDistributedParamsList)
{ {
DistObjectOperationParams *markObjectDistributedParam = NULL;
int subObjectId = 0; int subObjectId = 0;
char *checkObjectExistence = "false"; char *checkObjectExistence = "false";
foreach_ptr(markObjectDistributedParam, markObjectDistributedParamList)
UnmarkDistributedLocallyParams *unmarkDistributedParams = NULL;
foreach_ptr(unmarkDistributedParams, unmarkDistributedParamsList)
{ {
StringInfo query = makeStringInfo(); StringInfo query = makeStringInfo();
appendStringInfo(query, appendStringInfo(query,
UNMARK_OBJECT_DISTRIBUTED, UNMARK_OBJECT_DISTRIBUTED,
AuthIdRelationId, unmarkDistributedParams->catalogRelId,
markObjectDistributedParam->id, unmarkDistributedParams->id,
subObjectId, checkObjectExistence); subObjectId, checkObjectExistence);
RunCitusMainDBQuery(query->data); RunCitusMainDBQuery(query->data);
} }
@ -344,107 +401,51 @@ UnMarkObjectDistributedLocallyFromNonMainDb(List *markObjectDistributedParamList
/* /*
* GetDistObjectOperationParams returns DistObjectOperationParams for the target * getMarkDistributedParams and getUnmarkDistributedParams callback implementations
* object of given parsetree. * for NonMainDbDistributeObjectOps start here.
*/ */
List * static MarkDistributedGloballyParams *
GetDistObjectOperationParams(Node *parsetree) CreateRoleStmtGetMarkDistributedParams(Node *parsetree)
{
List *paramsList = NIL;
if (IsA(parsetree, CreateRoleStmt))
{ {
CreateRoleStmt *stmt = castNode(CreateRoleStmt, parsetree); CreateRoleStmt *stmt = castNode(CreateRoleStmt, parsetree);
DistObjectOperationParams *params = MarkDistributedGloballyParams *params =
(DistObjectOperationParams *) palloc(sizeof(DistObjectOperationParams)); (MarkDistributedGloballyParams *) palloc(sizeof(MarkDistributedGloballyParams));
params->name = stmt->role; params->name = stmt->role;
params->catalogRelId = AuthIdRelationId; params->catalogRelId = AuthIdRelationId;
params->id = get_role_oid(stmt->role, false);
paramsList = lappend(paramsList, params); /* object must exist as we've just created it */
bool missingOk = false;
params->id = get_role_oid(stmt->role, missingOk);
return params;
} }
else if (IsA(parsetree, DropRoleStmt))
static List *
DropRoleStmtGetUnmarkDistributedParams(Node *parsetree)
{ {
DropRoleStmt *stmt = castNode(DropRoleStmt, parsetree); DropRoleStmt *stmt = castNode(DropRoleStmt, parsetree);
RoleSpec *roleSpec;
List *paramsList = NIL;
RoleSpec *roleSpec = NULL;
foreach_ptr(roleSpec, stmt->roles) foreach_ptr(roleSpec, stmt->roles)
{ {
DistObjectOperationParams *params = (DistObjectOperationParams *) palloc( UnmarkDistributedLocallyParams *params =
sizeof(DistObjectOperationParams)); (UnmarkDistributedLocallyParams *) palloc(
sizeof(UnmarkDistributedLocallyParams));
Oid roleOid = get_role_oid(roleSpec->rolename, true);
Oid roleOid = get_role_oid(roleSpec->rolename, stmt->missing_ok);
if (roleOid == InvalidOid) if (roleOid == InvalidOid)
{ {
continue; continue;
} }
params->id = roleOid; params->id = roleOid;
params->name = roleSpec->rolename;
params->catalogRelId = AuthIdRelationId; params->catalogRelId = AuthIdRelationId;
paramsList = lappend(paramsList, params); paramsList = lappend(paramsList, params);
} }
}
else
{
elog(ERROR, "unsupported statement type");
}
return paramsList; return paramsList;
} }
/*
* NonMainDbCheckSupportedObjectTypeForCreateDatabase implements checkSupportedObjectTypes
* callback for CreatedbStmt.
*
* We don't try to send the query to the main database if the CREATE DATABASE
* command is for the main database itself, this is a very rare case but it's
* exercised by our test suite.
*/
static bool
NonMainDbCheckSupportedObjectTypeForCreateDatabase(Node *node)
{
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
return strcmp(stmt->dbname, MainDb) != 0;
}
/*
* NonMainDbCheckSupportedObjectTypeForDropDatabase implements checkSupportedObjectTypes
* callback for DropdbStmt.
*
* We don't try to send the query to the main database if the DROP DATABASE
* command is for the main database itself, this is a very rare case but it's
* exercised by our test suite.
*/
static bool
NonMainDbCheckSupportedObjectTypeForDropDatabase(Node *node)
{
DropdbStmt *stmt = castNode(DropdbStmt, node);
return strcmp(stmt->dbname, MainDb) != 0;
}
/*
* NonMainDbCheckSupportedObjectTypeForGrant implements checkSupportedObjectTypes
* callback for GrantStmt.
*/
static bool
NonMainDbCheckSupportedObjectTypeForGrant(Node *node)
{
GrantStmt *stmt = castNode(GrantStmt, node);
return stmt->objtype == OBJECT_DATABASE;
}
/*
* NonMainDbCheckSupportedObjectTypeForSecLabel implements checkSupportedObjectTypes
* callback for SecLabel.
*/
static bool
NonMainDbCheckSupportedObjectTypeForSecLabel(Node *node)
{
SecLabelStmt *stmt = castNode(SecLabelStmt, node);
return stmt->objtype == OBJECT_ROLE;
}