mirror of https://github.com/citusdata/citus.git
Refactor the code that supports node-wide object mgmt commands from non-main dbs (#7544)
RunPreprocessNonMainDBCommand and RunPostprocessNonMainDBCommand are the entrypoints for this module. These functions are called from utility_hook.c to support some of the node-wide object management commands from non-main databases. To add support for a new command type, one needs to define a new NonMainDbDistributeObjectOps object and add it to GetNonMainDbDistributeObjectOps.pull/7537/head^2
parent
bf05bf51ec
commit
d129064280
|
@ -0,0 +1,351 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* non_main_db_distribute_object_ops.c
|
||||
*
|
||||
* Routines to support node-wide object management commands from non-main
|
||||
* databases.
|
||||
*
|
||||
* RunPreprocessNonMainDBCommand and RunPostprocessNonMainDBCommand are
|
||||
* the entrypoints for this module. These functions are called from
|
||||
* utility_hook.c to support some of the node-wide object management
|
||||
* commands from non-main databases.
|
||||
*
|
||||
* To add support for a new command type, one needs to define a new
|
||||
* NonMainDbDistributeObjectOps object within OperationArray. Also, if
|
||||
* the command requires marking or unmarking some objects as distributed,
|
||||
* the necessary operations can be implemented in
|
||||
* RunPreprocessNonMainDBCommand and RunPostprocessNonMainDBCommand.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "catalog/pg_authid_d.h"
|
||||
#include "nodes/nodes.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
#include "utils/builtins.h"
|
||||
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/deparser.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/remote_transaction.h"
|
||||
|
||||
|
||||
#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \
|
||||
"SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)"
|
||||
#define START_MANAGEMENT_TRANSACTION \
|
||||
"SELECT citus_internal.start_management_transaction('%lu')"
|
||||
#define MARK_OBJECT_DISTRIBUTED \
|
||||
"SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)"
|
||||
#define UNMARK_OBJECT_DISTRIBUTED \
|
||||
"SELECT pg_catalog.citus_unmark_object_distributed(%d, %d, %d, %s)"
|
||||
|
||||
|
||||
/*
|
||||
* NonMainDbDistributeObjectOps contains the necessary callbacks / flags to
|
||||
* support node-wide object management commands from non-main databases.
|
||||
*
|
||||
* cannotBeExecutedInTransaction:
|
||||
* Indicates whether the statement cannot be executed in a transaction. If
|
||||
* this is set to true, the statement will be executed directly on the main
|
||||
* database because there are no transactional visibility issues for such
|
||||
* commands.
|
||||
*
|
||||
* checkSupportedObjectType:
|
||||
* Callback function that checks whether type of the object referred to by
|
||||
* given statement is supported. Can be NULL if not applicable for the
|
||||
* statement type.
|
||||
*/
|
||||
typedef struct NonMainDbDistributeObjectOps
|
||||
{
|
||||
bool cannotBeExecutedInTransaction;
|
||||
bool (*checkSupportedObjectType)(Node *parsetree);
|
||||
} NonMainDbDistributeObjectOps;
|
||||
|
||||
|
||||
/*
|
||||
* checkSupportedObjectType callbacks for OperationArray.
|
||||
*/
|
||||
static bool CreateDbStmtCheckSupportedObjectType(Node *node);
|
||||
static bool DropDbStmtCheckSupportedObjectType(Node *node);
|
||||
static bool GrantStmtCheckSupportedObjectType(Node *node);
|
||||
static bool SecLabelStmtCheckSupportedObjectType(Node *node);
|
||||
|
||||
/*
|
||||
* OperationArray that holds NonMainDbDistributeObjectOps for different command types.
|
||||
*/
|
||||
static const NonMainDbDistributeObjectOps *const OperationArray[] = {
|
||||
[T_CreateRoleStmt] = &(NonMainDbDistributeObjectOps) {
|
||||
.cannotBeExecutedInTransaction = false,
|
||||
.checkSupportedObjectType = NULL
|
||||
},
|
||||
[T_DropRoleStmt] = &(NonMainDbDistributeObjectOps) {
|
||||
.cannotBeExecutedInTransaction = false,
|
||||
.checkSupportedObjectType = NULL
|
||||
},
|
||||
[T_AlterRoleStmt] = &(NonMainDbDistributeObjectOps) {
|
||||
.cannotBeExecutedInTransaction = false,
|
||||
.checkSupportedObjectType = NULL
|
||||
},
|
||||
[T_GrantRoleStmt] = &(NonMainDbDistributeObjectOps) {
|
||||
.cannotBeExecutedInTransaction = false,
|
||||
.checkSupportedObjectType = NULL
|
||||
},
|
||||
[T_CreatedbStmt] = &(NonMainDbDistributeObjectOps) {
|
||||
.cannotBeExecutedInTransaction = true,
|
||||
.checkSupportedObjectType = CreateDbStmtCheckSupportedObjectType
|
||||
},
|
||||
[T_DropdbStmt] = &(NonMainDbDistributeObjectOps) {
|
||||
.cannotBeExecutedInTransaction = true,
|
||||
.checkSupportedObjectType = DropDbStmtCheckSupportedObjectType
|
||||
},
|
||||
[T_GrantStmt] = &(NonMainDbDistributeObjectOps) {
|
||||
.cannotBeExecutedInTransaction = false,
|
||||
.checkSupportedObjectType = GrantStmtCheckSupportedObjectType
|
||||
},
|
||||
[T_SecLabelStmt] = &(NonMainDbDistributeObjectOps) {
|
||||
.cannotBeExecutedInTransaction = false,
|
||||
.checkSupportedObjectType = SecLabelStmtCheckSupportedObjectType
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
/* other static function declarations */
|
||||
const NonMainDbDistributeObjectOps * GetNonMainDbDistributeObjectOps(Node *parsetree);
|
||||
static void CreateRoleStmtMarkDistGloballyOnMainDbs(CreateRoleStmt *createRoleStmt);
|
||||
static void DropRoleStmtUnmarkDistOnLocalMainDb(DropRoleStmt *dropRoleStmt);
|
||||
static void MarkObjectDistributedGloballyOnMainDbs(Oid catalogRelId, Oid objectId,
|
||||
char *objectName);
|
||||
static void UnmarkObjectDistributedOnLocalMainDb(uint16 catalogRelId, Oid objectId);
|
||||
|
||||
|
||||
/*
|
||||
* RunPreprocessNonMainDBCommand runs the necessary commands for a query, in main
|
||||
* database before query is run on the local node with PrevProcessUtility.
|
||||
*
|
||||
* Returns true if previous utility hook needs to be skipped after completing
|
||||
* preprocess phase.
|
||||
*/
|
||||
bool
|
||||
RunPreprocessNonMainDBCommand(Node *parsetree)
|
||||
{
|
||||
if (IsMainDB)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
const NonMainDbDistributeObjectOps *ops = GetNonMainDbDistributeObjectOps(parsetree);
|
||||
if (!ops)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
char *queryString = DeparseTreeNode(parsetree);
|
||||
|
||||
/*
|
||||
* For the commands that cannot be executed in a transaction, there are no
|
||||
* transactional visibility issues. We directly route them to main database
|
||||
* so that we only have to consider one code-path for such commands.
|
||||
*/
|
||||
if (ops->cannotBeExecutedInTransaction)
|
||||
{
|
||||
IsMainDBCommandInXact = false;
|
||||
RunCitusMainDBQuery((char *) queryString);
|
||||
return true;
|
||||
}
|
||||
|
||||
IsMainDBCommandInXact = true;
|
||||
|
||||
StringInfo mainDBQuery = makeStringInfo();
|
||||
appendStringInfo(mainDBQuery,
|
||||
START_MANAGEMENT_TRANSACTION,
|
||||
GetCurrentFullTransactionId().value);
|
||||
RunCitusMainDBQuery(mainDBQuery->data);
|
||||
|
||||
mainDBQuery = makeStringInfo();
|
||||
appendStringInfo(mainDBQuery,
|
||||
EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER,
|
||||
quote_literal_cstr(queryString),
|
||||
quote_literal_cstr(CurrentUserName()));
|
||||
RunCitusMainDBQuery(mainDBQuery->data);
|
||||
|
||||
if (IsA(parsetree, DropRoleStmt))
|
||||
{
|
||||
DropRoleStmtUnmarkDistOnLocalMainDb((DropRoleStmt *) parsetree);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RunPostprocessNonMainDBCommand runs the necessary commands for a query, in main
|
||||
* database after query is run on the local node with PrevProcessUtility.
|
||||
*/
|
||||
void
|
||||
RunPostprocessNonMainDBCommand(Node *parsetree)
|
||||
{
|
||||
if (IsMainDB || !GetNonMainDbDistributeObjectOps(parsetree))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (IsA(parsetree, CreateRoleStmt))
|
||||
{
|
||||
CreateRoleStmtMarkDistGloballyOnMainDbs((CreateRoleStmt *) parsetree);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetNonMainDbDistributeObjectOps returns the NonMainDbDistributeObjectOps for given
|
||||
* command if it's node-wide object management command that's supported from non-main
|
||||
* databases.
|
||||
*/
|
||||
const NonMainDbDistributeObjectOps *
|
||||
GetNonMainDbDistributeObjectOps(Node *parsetree)
|
||||
{
|
||||
NodeTag tag = nodeTag(parsetree);
|
||||
if (tag >= lengthof(OperationArray))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
const NonMainDbDistributeObjectOps *ops = OperationArray[tag];
|
||||
|
||||
if (ops == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!ops->checkSupportedObjectType ||
|
||||
ops->checkSupportedObjectType(parsetree))
|
||||
{
|
||||
return ops;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateRoleStmtMarkDistGloballyOnMainDbs marks the role as
|
||||
* distributed on all main databases globally.
|
||||
*/
|
||||
static void
|
||||
CreateRoleStmtMarkDistGloballyOnMainDbs(CreateRoleStmt *createRoleStmt)
|
||||
{
|
||||
/* object must exist as we've just created it */
|
||||
bool missingOk = false;
|
||||
Oid roleId = get_role_oid(createRoleStmt->role, missingOk);
|
||||
|
||||
MarkObjectDistributedGloballyOnMainDbs(AuthIdRelationId, roleId,
|
||||
createRoleStmt->role);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DropRoleStmtUnmarkDistOnLocalMainDb unmarks the roles as
|
||||
* distributed on the local main database.
|
||||
*/
|
||||
static void
|
||||
DropRoleStmtUnmarkDistOnLocalMainDb(DropRoleStmt *dropRoleStmt)
|
||||
{
|
||||
RoleSpec *roleSpec = NULL;
|
||||
foreach_ptr(roleSpec, dropRoleStmt->roles)
|
||||
{
|
||||
Oid roleOid = get_role_oid(roleSpec->rolename,
|
||||
dropRoleStmt->missing_ok);
|
||||
if (roleOid == InvalidOid)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
UnmarkObjectDistributedOnLocalMainDb(AuthIdRelationId, roleOid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkObjectDistributedGloballyOnMainDbs marks an object as
|
||||
* distributed on all main databases globally.
|
||||
*/
|
||||
static void
|
||||
MarkObjectDistributedGloballyOnMainDbs(Oid catalogRelId, Oid objectId, char *objectName)
|
||||
{
|
||||
StringInfo mainDBQuery = makeStringInfo();
|
||||
appendStringInfo(mainDBQuery,
|
||||
MARK_OBJECT_DISTRIBUTED,
|
||||
catalogRelId,
|
||||
quote_literal_cstr(objectName),
|
||||
objectId,
|
||||
quote_literal_cstr(CurrentUserName()));
|
||||
RunCitusMainDBQuery(mainDBQuery->data);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UnmarkObjectDistributedOnLocalMainDb unmarks an object as
|
||||
* distributed on the local main database.
|
||||
*/
|
||||
static void
|
||||
UnmarkObjectDistributedOnLocalMainDb(uint16 catalogRelId, Oid objectId)
|
||||
{
|
||||
const int subObjectId = 0;
|
||||
const char *checkObjectExistence = "false";
|
||||
|
||||
StringInfo query = makeStringInfo();
|
||||
appendStringInfo(query,
|
||||
UNMARK_OBJECT_DISTRIBUTED,
|
||||
catalogRelId, objectId,
|
||||
subObjectId, checkObjectExistence);
|
||||
RunCitusMainDBQuery(query->data);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* checkSupportedObjectTypes callbacks for OperationArray lie below.
|
||||
*/
|
||||
static bool
|
||||
CreateDbStmtCheckSupportedObjectType(Node *node)
|
||||
{
|
||||
/*
|
||||
* We don't try to send the query to the main database if the CREATE
|
||||
* DATABASE command is for the main database itself, this is a very
|
||||
* rare case but it's exercised by our test suite.
|
||||
*/
|
||||
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
|
||||
return strcmp(stmt->dbname, MainDb) != 0;
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
DropDbStmtCheckSupportedObjectType(Node *node)
|
||||
{
|
||||
/*
|
||||
* We don't try to send the query to the main database if the DROP
|
||||
* DATABASE command is for the main database itself, this is a very
|
||||
* rare case but it's exercised by our test suite.
|
||||
*/
|
||||
DropdbStmt *stmt = castNode(DropdbStmt, node);
|
||||
return strcmp(stmt->dbname, MainDb) != 0;
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
GrantStmtCheckSupportedObjectType(Node *node)
|
||||
{
|
||||
GrantStmt *stmt = castNode(GrantStmt, node);
|
||||
return stmt->objtype == OBJECT_DATABASE;
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
SecLabelStmtCheckSupportedObjectType(Node *node)
|
||||
{
|
||||
SecLabelStmt *stmt = castNode(SecLabelStmt, node);
|
||||
return stmt->objtype == OBJECT_ROLE;
|
||||
}
|
|
@ -87,69 +87,6 @@
|
|||
#include "distributed/worker_shard_visibility.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
|
||||
#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \
|
||||
"SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)"
|
||||
#define START_MANAGEMENT_TRANSACTION \
|
||||
"SELECT citus_internal.start_management_transaction('%lu')"
|
||||
#define MARK_OBJECT_DISTRIBUTED \
|
||||
"SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)"
|
||||
#define UNMARK_OBJECT_DISTRIBUTED \
|
||||
"SELECT pg_catalog.citus_unmark_object_distributed(%d, %d, %d,%s)"
|
||||
|
||||
/* see NonMainDbDistributedStatementInfo for the explanation of these flags */
|
||||
typedef enum DistObjectOperation
|
||||
{
|
||||
NO_DIST_OBJECT_OPERATION,
|
||||
MARK_DISTRIBUTED_GLOBALLY,
|
||||
UNMARK_DISTRIBUTED_LOCALLY
|
||||
} DistObjectOperation;
|
||||
|
||||
|
||||
/*
|
||||
* NonMainDbDistributedStatementInfo is used to determine whether a statement is
|
||||
* supported from non-main databases and whether it should be marked or unmarked
|
||||
* as distributed.
|
||||
*
|
||||
* When creating a distributed object, we always have to mark such objects as
|
||||
* "distributed" but while for some object types we can delegate this to main
|
||||
* database, for some others we have to explicitly send a command to all nodes
|
||||
* in this code-path to achieve this. Callers need to provide
|
||||
* MARK_DISTRIBUTED_GLOBALLY when that is the case.
|
||||
*
|
||||
* Similarly when dropping a distributed object, we always have to unmark such
|
||||
* objects as "distributed" and our utility hook on remote nodes achieve this
|
||||
* via UnmarkNodeWideObjectsDistributed() because the commands that we send to
|
||||
* workers are executed via main db. However for the local node, this is not the
|
||||
* case as we're not in the main db. For this reason, callers need to provide
|
||||
* UNMARK_DISTRIBUTED_LOCALLY to unmark an object for local node as well.
|
||||
*/
|
||||
typedef struct NonMainDbDistributedStatementInfo
|
||||
{
|
||||
int statementType;
|
||||
DistObjectOperation DistObjectOperation;
|
||||
|
||||
/*
|
||||
* checkSupportedObjectTypes is a callback function that checks whether
|
||||
* type of the object referred to by given statement is supported.
|
||||
*
|
||||
* Can be NULL if not applicable for the statement type.
|
||||
*/
|
||||
bool (*checkSupportedObjectTypes)(Node *node);
|
||||
} NonMainDbDistributedStatementInfo;
|
||||
|
||||
/*
|
||||
* DistObjectOperationParams is used to pass parameters to the
|
||||
* MarkObjectDistributedGloballyFromNonMainDb function and
|
||||
* UnMarkObjectDistributedLocallyFromNonMainDb functions.
|
||||
*/
|
||||
typedef struct DistObjectOperationParams
|
||||
{
|
||||
char *name;
|
||||
Oid id;
|
||||
uint16 catalogRelId;
|
||||
} DistObjectOperationParams;
|
||||
|
||||
|
||||
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
|
||||
int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE;
|
||||
PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */
|
||||
|
@ -179,46 +116,6 @@ static bool IsDropSchemaOrDB(Node *parsetree);
|
|||
static bool ShouldCheckUndistributeCitusLocalTables(void);
|
||||
|
||||
|
||||
/*
|
||||
* Functions to support commands used to manage node-wide objects from non-main
|
||||
* databases.
|
||||
*/
|
||||
static bool IsCommandToCreateOrDropMainDB(Node *parsetree);
|
||||
static void RunPreprocessMainDBCommand(Node *parsetree);
|
||||
static void RunPostprocessMainDBCommand(Node *parsetree);
|
||||
static bool IsStatementSupportedFromNonMainDb(Node *parsetree);
|
||||
static bool StatementRequiresMarkDistributedGloballyFromNonMainDb(Node *parsetree);
|
||||
static bool StatementRequiresUnmarkDistributedLocallyFromNonMainDb(Node *parsetree);
|
||||
static void MarkObjectDistributedGloballyFromNonMainDb(Node *parsetree);
|
||||
static void UnMarkObjectDistributedLocallyFromNonMainDb(List *unmarkDistributedList);
|
||||
static List * GetDistObjectOperationParams(Node *parsetree);
|
||||
|
||||
/*
|
||||
* checkSupportedObjectTypes callbacks for
|
||||
* NonMainDbDistributedStatementInfo objects.
|
||||
*/
|
||||
static bool NonMainDbCheckSupportedObjectTypeForGrant(Node *node);
|
||||
static bool NonMainDbCheckSupportedObjectTypeForSecLabel(Node *node);
|
||||
|
||||
|
||||
/*
|
||||
* NonMainDbSupportedStatements is an array of statements that are supported
|
||||
* from non-main databases.
|
||||
*/
|
||||
ObjectType supportedObjectTypesForGrantStmt[] = { OBJECT_DATABASE };
|
||||
static const NonMainDbDistributedStatementInfo NonMainDbSupportedStatements[] = {
|
||||
{ T_GrantRoleStmt, NO_DIST_OBJECT_OPERATION, NULL },
|
||||
{ T_CreateRoleStmt, MARK_DISTRIBUTED_GLOBALLY, NULL },
|
||||
{ T_DropRoleStmt, UNMARK_DISTRIBUTED_LOCALLY, NULL },
|
||||
{ T_AlterRoleStmt, NO_DIST_OBJECT_OPERATION, NULL },
|
||||
{ T_GrantStmt, NO_DIST_OBJECT_OPERATION, NonMainDbCheckSupportedObjectTypeForGrant },
|
||||
{ T_CreatedbStmt, NO_DIST_OBJECT_OPERATION, NULL },
|
||||
{ T_DropdbStmt, NO_DIST_OBJECT_OPERATION, NULL },
|
||||
{ T_SecLabelStmt, NO_DIST_OBJECT_OPERATION,
|
||||
NonMainDbCheckSupportedObjectTypeForSecLabel },
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of
|
||||
* pieces of a utility statement before invoking ProcessUtility.
|
||||
|
@ -350,36 +247,25 @@ citus_ProcessUtility(PlannedStmt *pstmt,
|
|||
if (!CitusHasBeenLoaded())
|
||||
{
|
||||
/*
|
||||
* We always execute CREATE/DROP DATABASE from the main database. There are no
|
||||
* transactional visibility issues, since these commands are non-transactional.
|
||||
* And this way we only have to consider one codepath when creating databases.
|
||||
* We don't try to send the query to the main database if the CREATE/DROP DATABASE
|
||||
* command is for the main database itself, this is a very rare case but it's
|
||||
* exercised by our test suite.
|
||||
* Process the command via RunPreprocessNonMainDBCommand and
|
||||
* RunPostprocessNonMainDBCommand hooks if we're in a non-main database
|
||||
* and if the command is a node-wide object management command that we
|
||||
* support from non-main databases.
|
||||
*/
|
||||
if (!IsMainDB &&
|
||||
!IsCommandToCreateOrDropMainDB(parsetree))
|
||||
{
|
||||
RunPreprocessMainDBCommand(parsetree);
|
||||
|
||||
if (IsA(parsetree, CreatedbStmt) ||
|
||||
IsA(parsetree, DropdbStmt))
|
||||
{
|
||||
return;
|
||||
}
|
||||
bool shouldSkipPrevUtilityHook = RunPreprocessNonMainDBCommand(parsetree);
|
||||
|
||||
if (!shouldSkipPrevUtilityHook)
|
||||
{
|
||||
/*
|
||||
* Ensure that utility commands do not behave any differently until CREATE
|
||||
* EXTENSION is invoked.
|
||||
*/
|
||||
PrevProcessUtility(pstmt, queryString, false, context,
|
||||
params, queryEnv, dest, completionTag);
|
||||
}
|
||||
|
||||
/*
|
||||
* Ensure that utility commands do not behave any differently until CREATE
|
||||
* EXTENSION is invoked.
|
||||
*/
|
||||
PrevProcessUtility(pstmt, queryString, false, context,
|
||||
params, queryEnv, dest, completionTag);
|
||||
|
||||
if (!IsMainDB)
|
||||
{
|
||||
RunPostprocessMainDBCommand(parsetree);
|
||||
}
|
||||
RunPostprocessNonMainDBCommand(parsetree);
|
||||
|
||||
return;
|
||||
}
|
||||
|
@ -1715,281 +1601,3 @@ DropSchemaOrDBInProgress(void)
|
|||
{
|
||||
return activeDropSchemaOrDBs > 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsCommandToCreateOrDropMainDB checks if this query creates or drops the
|
||||
* main database, so we can make an exception and not send this query to
|
||||
* the main database.
|
||||
*/
|
||||
static bool
|
||||
IsCommandToCreateOrDropMainDB(Node *parsetree)
|
||||
{
|
||||
if (IsA(parsetree, CreatedbStmt))
|
||||
{
|
||||
CreatedbStmt *createdbStmt = castNode(CreatedbStmt, parsetree);
|
||||
return strcmp(createdbStmt->dbname, MainDb) == 0;
|
||||
}
|
||||
else if (IsA(parsetree, DropdbStmt))
|
||||
{
|
||||
DropdbStmt *dropdbStmt = castNode(DropdbStmt, parsetree);
|
||||
return strcmp(dropdbStmt->dbname, MainDb) == 0;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RunPreprocessMainDBCommand runs the necessary commands for a query, in main
|
||||
* database before query is run on the local node with PrevProcessUtility
|
||||
*/
|
||||
static void
|
||||
RunPreprocessMainDBCommand(Node *parsetree)
|
||||
{
|
||||
if (!IsStatementSupportedFromNonMainDb(parsetree))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
char *queryString = DeparseTreeNode(parsetree);
|
||||
|
||||
if (IsA(parsetree, CreatedbStmt) ||
|
||||
IsA(parsetree, DropdbStmt))
|
||||
{
|
||||
IsMainDBCommandInXact = false;
|
||||
RunCitusMainDBQuery((char *) queryString);
|
||||
return;
|
||||
}
|
||||
|
||||
IsMainDBCommandInXact = true;
|
||||
|
||||
StringInfo mainDBQuery = makeStringInfo();
|
||||
appendStringInfo(mainDBQuery,
|
||||
START_MANAGEMENT_TRANSACTION,
|
||||
GetCurrentFullTransactionId().value);
|
||||
RunCitusMainDBQuery(mainDBQuery->data);
|
||||
|
||||
mainDBQuery = makeStringInfo();
|
||||
appendStringInfo(mainDBQuery,
|
||||
EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER,
|
||||
quote_literal_cstr(queryString),
|
||||
quote_literal_cstr(CurrentUserName()));
|
||||
RunCitusMainDBQuery(mainDBQuery->data);
|
||||
|
||||
if (StatementRequiresUnmarkDistributedLocallyFromNonMainDb(parsetree))
|
||||
{
|
||||
List *unmarkParams = GetDistObjectOperationParams(parsetree);
|
||||
UnMarkObjectDistributedLocallyFromNonMainDb(unmarkParams);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RunPostprocessMainDBCommand runs the necessary commands for a query, in main
|
||||
* database after query is run on the local node with PrevProcessUtility
|
||||
*/
|
||||
static void
|
||||
RunPostprocessMainDBCommand(Node *parsetree)
|
||||
{
|
||||
if (IsStatementSupportedFromNonMainDb(parsetree) &&
|
||||
StatementRequiresMarkDistributedGloballyFromNonMainDb(parsetree))
|
||||
{
|
||||
MarkObjectDistributedGloballyFromNonMainDb(parsetree);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsStatementSupportedFromNonMainDb returns true if the statement is supported from a
|
||||
* non-main database.
|
||||
*/
|
||||
static bool
|
||||
IsStatementSupportedFromNonMainDb(Node *parsetree)
|
||||
{
|
||||
NodeTag type = nodeTag(parsetree);
|
||||
|
||||
for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
|
||||
sizeof(NonMainDbSupportedStatements[0]); i++)
|
||||
{
|
||||
if (type != NonMainDbSupportedStatements[i].statementType)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
return !NonMainDbSupportedStatements[i].checkSupportedObjectTypes ||
|
||||
NonMainDbSupportedStatements[i].checkSupportedObjectTypes(parsetree);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StatementRequiresMarkDistributedGloballyFromNonMainDb returns true if the statement should be marked
|
||||
* as distributed when executed from a non-main database.
|
||||
*/
|
||||
static bool
|
||||
StatementRequiresMarkDistributedGloballyFromNonMainDb(Node *parsetree)
|
||||
{
|
||||
NodeTag type = nodeTag(parsetree);
|
||||
|
||||
for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
|
||||
sizeof(NonMainDbSupportedStatements[0]); i++)
|
||||
{
|
||||
if (type == NonMainDbSupportedStatements[i].statementType)
|
||||
{
|
||||
return NonMainDbSupportedStatements[i].DistObjectOperation ==
|
||||
MARK_DISTRIBUTED_GLOBALLY;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StatementRequiresUnmarkDistributedLocallyFromNonMainDb returns true if the statement should be unmarked
|
||||
* as distributed when executed from a non-main database.
|
||||
*/
|
||||
static bool
|
||||
StatementRequiresUnmarkDistributedLocallyFromNonMainDb(Node *parsetree)
|
||||
{
|
||||
NodeTag type = nodeTag(parsetree);
|
||||
|
||||
for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
|
||||
sizeof(NonMainDbSupportedStatements[0]); i++)
|
||||
{
|
||||
if (type == NonMainDbSupportedStatements[i].statementType)
|
||||
{
|
||||
return NonMainDbSupportedStatements[i].DistObjectOperation ==
|
||||
UNMARK_DISTRIBUTED_LOCALLY;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkObjectDistributedGloballyFromNonMainDb marks the given object as distributed on the
|
||||
* non-main database.
|
||||
*/
|
||||
static void
|
||||
MarkObjectDistributedGloballyFromNonMainDb(Node *parsetree)
|
||||
{
|
||||
List *distObjectOperationParams =
|
||||
GetDistObjectOperationParams(parsetree);
|
||||
|
||||
DistObjectOperationParams *distObjectOperationParam = NULL;
|
||||
|
||||
foreach_ptr(distObjectOperationParam, distObjectOperationParams)
|
||||
{
|
||||
StringInfo mainDBQuery = makeStringInfo();
|
||||
appendStringInfo(mainDBQuery,
|
||||
MARK_OBJECT_DISTRIBUTED,
|
||||
distObjectOperationParam->catalogRelId,
|
||||
quote_literal_cstr(distObjectOperationParam->name),
|
||||
distObjectOperationParam->id,
|
||||
quote_literal_cstr(CurrentUserName()));
|
||||
RunCitusMainDBQuery(mainDBQuery->data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UnMarkObjectDistributedLocallyFromNonMainDb unmarks the given object as distributed on the
|
||||
* non-main database.
|
||||
*/
|
||||
static void
|
||||
UnMarkObjectDistributedLocallyFromNonMainDb(List *markObjectDistributedParamList)
|
||||
{
|
||||
DistObjectOperationParams *markObjectDistributedParam = NULL;
|
||||
int subObjectId = 0;
|
||||
char *checkObjectExistence = "false";
|
||||
foreach_ptr(markObjectDistributedParam, markObjectDistributedParamList)
|
||||
{
|
||||
StringInfo query = makeStringInfo();
|
||||
appendStringInfo(query,
|
||||
UNMARK_OBJECT_DISTRIBUTED,
|
||||
AuthIdRelationId,
|
||||
markObjectDistributedParam->id,
|
||||
subObjectId, checkObjectExistence);
|
||||
RunCitusMainDBQuery(query->data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetDistObjectOperationParams returns DistObjectOperationParams for the target
|
||||
* object of given parsetree.
|
||||
*/
|
||||
List *
|
||||
GetDistObjectOperationParams(Node *parsetree)
|
||||
{
|
||||
List *paramsList = NIL;
|
||||
if (IsA(parsetree, CreateRoleStmt))
|
||||
{
|
||||
CreateRoleStmt *stmt = castNode(CreateRoleStmt, parsetree);
|
||||
DistObjectOperationParams *params =
|
||||
(DistObjectOperationParams *) palloc(sizeof(DistObjectOperationParams));
|
||||
params->name = stmt->role;
|
||||
params->catalogRelId = AuthIdRelationId;
|
||||
params->id = get_role_oid(stmt->role, false);
|
||||
|
||||
paramsList = lappend(paramsList, params);
|
||||
}
|
||||
else if (IsA(parsetree, DropRoleStmt))
|
||||
{
|
||||
DropRoleStmt *stmt = castNode(DropRoleStmt, parsetree);
|
||||
RoleSpec *roleSpec;
|
||||
foreach_ptr(roleSpec, stmt->roles)
|
||||
{
|
||||
DistObjectOperationParams *params = (DistObjectOperationParams *) palloc(
|
||||
sizeof(DistObjectOperationParams));
|
||||
|
||||
Oid roleOid = get_role_oid(roleSpec->rolename, true);
|
||||
|
||||
if (roleOid == InvalidOid)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
params->id = roleOid;
|
||||
params->name = roleSpec->rolename;
|
||||
params->catalogRelId = AuthIdRelationId;
|
||||
|
||||
paramsList = lappend(paramsList, params);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(ERROR, "unsupported statement type");
|
||||
}
|
||||
|
||||
return paramsList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NonMainDbCheckSupportedObjectTypeForGrant implements checkSupportedObjectTypes
|
||||
* callback for GrantStmt.
|
||||
*/
|
||||
static bool
|
||||
NonMainDbCheckSupportedObjectTypeForGrant(Node *node)
|
||||
{
|
||||
GrantStmt *stmt = castNode(GrantStmt, node);
|
||||
return stmt->objtype == OBJECT_DATABASE;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NonMainDbCheckSupportedObjectTypeForSecLabel implements checkSupportedObjectTypes
|
||||
* callback for SecLabel.
|
||||
*/
|
||||
static bool
|
||||
NonMainDbCheckSupportedObjectTypeForSecLabel(Node *node)
|
||||
{
|
||||
SecLabelStmt *stmt = castNode(SecLabelStmt, node);
|
||||
return stmt->objtype == OBJECT_ROLE;
|
||||
}
|
||||
|
|
|
@ -104,6 +104,10 @@ typedef struct DistributeObjectOps
|
|||
|
||||
const DistributeObjectOps * GetDistributeObjectOps(Node *node);
|
||||
|
||||
/* functions to support node-wide object management commands from non-main dbs */
|
||||
extern bool RunPreprocessNonMainDBCommand(Node *parsetree);
|
||||
extern void RunPostprocessNonMainDBCommand(Node *parsetree);
|
||||
|
||||
/*
|
||||
* Flags that can be passed to GetForeignKeyOids to indicate
|
||||
* which foreign key constraint OIDs are to be extracted
|
||||
|
|
Loading…
Reference in New Issue