Merge pull request #4544 from citusdata/refactor_utility_hook

Refactor utility hook
pull/4546/head
Önder Kalacı 2021-01-20 16:02:48 +03:00 committed by GitHub
commit 64a1fddd9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 150 additions and 115 deletions

View File

@ -629,8 +629,8 @@ ConvertTable(TableConversionState *con)
Node *parseTree = ParseTreeNode(tableCreationSql); Node *parseTree = ParseTreeNode(tableCreationSql);
RelayEventExtendNames(parseTree, con->schemaName, con->hashOfName); RelayEventExtendNames(parseTree, con->schemaName, con->hashOfName);
CitusProcessUtility(parseTree, tableCreationSql, PROCESS_UTILITY_TOPLEVEL, ProcessUtilityParseTree(parseTree, tableCreationSql, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL); NULL, None_Receiver, NULL);
} }
/* set columnar options */ /* set columnar options */
@ -682,8 +682,9 @@ ConvertTable(TableConversionState *con)
{ {
Node *parseTree = ParseTreeNode(attachPartitionCommand); Node *parseTree = ParseTreeNode(attachPartitionCommand);
CitusProcessUtility(parseTree, attachPartitionCommand, PROCESS_UTILITY_TOPLEVEL, ProcessUtilityParseTree(parseTree, attachPartitionCommand,
NULL, None_Receiver, NULL); PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
} }
if (isPartitionTable) if (isPartitionTable)

View File

@ -426,8 +426,8 @@ ExecuteAndLogDDLCommand(const char *commandString)
ereport(DEBUG4, (errmsg("executing \"%s\"", commandString))); ereport(DEBUG4, (errmsg("executing \"%s\"", commandString)));
Node *parseTree = ParseTreeNode(commandString); Node *parseTree = ParseTreeNode(commandString);
CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL, ProcessUtilityParseTree(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL); NULL, None_Receiver, NULL);
} }
@ -478,6 +478,6 @@ ExecuteForeignKeyCreateCommand(const char *commandString, bool skip_validation)
"command \"%s\"", commandString))); "command \"%s\"", commandString)));
} }
CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL, ProcessUtilityParseTree(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL); NULL, None_Receiver, NULL);
} }

View File

@ -75,6 +75,13 @@ static int activeDropSchemaOrDBs = 0;
/* Local functions forward declarations for helper functions */ /* Local functions forward declarations for helper functions */
static void ProcessUtilityInternal(PlannedStmt *pstmt,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo params,
struct QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletionCompat *completionTag);
static char * SetSearchPathToCurrentSearchPathCommand(void); static char * SetSearchPathToCurrentSearchPathCommand(void);
static char * CurrentSearchPath(void); static char * CurrentSearchPath(void);
static void IncrementUtilityHookCountersIfNecessary(Node *parsetree); static void IncrementUtilityHookCountersIfNecessary(Node *parsetree);
@ -84,13 +91,14 @@ static bool IsDropSchemaOrDB(Node *parsetree);
/* /*
* CitusProcessUtility is a convenience method to create a PlannedStmt out of pieces of a * ProcessUtilityForParseTree is a convenience method to create a PlannedStmt out of
* utility statement before invoking ProcessUtility. * pieces of a utility statement before invoking ProcessUtility.
*/ */
void void
CitusProcessUtility(Node *node, const char *queryString, ProcessUtilityContext context, ProcessUtilityParseTree(Node *node, const char *queryString, ProcessUtilityContext
ParamListInfo params, DestReceiver *dest, context,
QueryCompletionCompat *completionTag) ParamListInfo params, DestReceiver *dest,
QueryCompletionCompat *completionTag)
{ {
PlannedStmt *plannedStmt = makeNode(PlannedStmt); PlannedStmt *plannedStmt = makeNode(PlannedStmt);
plannedStmt->commandType = CMD_UTILITY; plannedStmt->commandType = CMD_UTILITY;
@ -120,7 +128,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
QueryCompletionCompat *completionTag) QueryCompletionCompat *completionTag)
{ {
Node *parsetree = pstmt->utilityStmt; Node *parsetree = pstmt->utilityStmt;
List *ddlJobs = NIL;
if (IsA(parsetree, TransactionStmt) || if (IsA(parsetree, TransactionStmt) ||
IsA(parsetree, LockStmt) || IsA(parsetree, LockStmt) ||
@ -164,6 +171,96 @@ multi_ProcessUtility(PlannedStmt *pstmt,
return; return;
} }
else if (IsA(parsetree, CallStmt))
{
CallStmt *callStmt = (CallStmt *) parsetree;
/*
* If the procedure is distributed and we are using MX then we have the
* possibility of calling it on the worker. If the data is located on
* the worker this can avoid making many network round trips.
*/
if (context == PROCESS_UTILITY_TOPLEVEL &&
CallDistributedProcedureRemotely(callStmt, dest))
{
return;
}
/*
* Stored procedures are a bit strange in the sense that some statements
* are not in a transaction block, but can be rolled back. We need to
* make sure we send all statements in a transaction block. The
* StoredProcedureLevel variable signals this to the router executor
* and indicates how deep in the call stack we are in case of nested
* stored procedures.
*/
StoredProcedureLevel += 1;
PG_TRY();
{
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
StoredProcedureLevel -= 1;
}
PG_CATCH();
{
StoredProcedureLevel -= 1;
PG_RE_THROW();
}
PG_END_TRY();
return;
}
else if (IsA(parsetree, DoStmt))
{
/*
* All statements in a DO block are executed in a single transaciton,
* so we need to keep track of whether we are inside a DO block.
*/
DoBlockLevel += 1;
PG_TRY();
{
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
DoBlockLevel -= 1;
}
PG_CATCH();
{
DoBlockLevel -= 1;
PG_RE_THROW();
}
PG_END_TRY();
return;
}
ProcessUtilityInternal(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
}
/*
* ProcessUtilityInternal is a helper function for multi_ProcessUtility where majority
* of the Citus specific utility statements are handled here. The distinction between
* both functions is that Citus_ProcessUtility does not handle CALL and DO statements.
* The reason for the distinction is implemented to be able to find the "top-level" DDL
* commands (not internal/cascading ones). UtilityHookLevel variable is used to achieve
* this goal.
*/
static void
ProcessUtilityInternal(PlannedStmt *pstmt,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo params,
struct QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletionCompat *completionTag)
{
Node *parsetree = pstmt->utilityStmt;
List *ddlJobs = NIL;
if (IsA(parsetree, ExplainStmt) && if (IsA(parsetree, ExplainStmt) &&
IsA(((ExplainStmt *) parsetree)->query, Query)) IsA(((ExplainStmt *) parsetree)->query, Query))
@ -213,73 +310,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
parsetree = ProcessCreateSubscriptionStmt(createSubStmt); parsetree = ProcessCreateSubscriptionStmt(createSubStmt);
} }
if (IsA(parsetree, CallStmt))
{
CallStmt *callStmt = (CallStmt *) parsetree;
/*
* If the procedure is distributed and we are using MX then we have the
* possibility of calling it on the worker. If the data is located on
* the worker this can avoid making many network round trips.
*/
if (context == PROCESS_UTILITY_TOPLEVEL &&
CallDistributedProcedureRemotely(callStmt, dest))
{
return;
}
/*
* Stored procedures are a bit strange in the sense that some statements
* are not in a transaction block, but can be rolled back. We need to
* make sure we send all statements in a transaction block. The
* StoredProcedureLevel variable signals this to the router executor
* and indicates how deep in the call stack we are in case of nested
* stored procedures.
*/
StoredProcedureLevel += 1;
PG_TRY();
{
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
StoredProcedureLevel -= 1;
}
PG_CATCH();
{
StoredProcedureLevel -= 1;
PG_RE_THROW();
}
PG_END_TRY();
return;
}
if (IsA(parsetree, DoStmt))
{
/*
* All statements in a DO block are executed in a single transaciton,
* so we need to keep track of whether we are inside a DO block.
*/
DoBlockLevel += 1;
PG_TRY();
{
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
DoBlockLevel -= 1;
}
PG_CATCH();
{
DoBlockLevel -= 1;
PG_RE_THROW();
}
PG_END_TRY();
return;
}
/* process SET LOCAL stmts of allowed GUCs in multi-stmt xacts */ /* process SET LOCAL stmts of allowed GUCs in multi-stmt xacts */
if (IsA(parsetree, VariableSetStmt)) if (IsA(parsetree, VariableSetStmt))
{ {
@ -473,6 +503,8 @@ multi_ProcessUtility(PlannedStmt *pstmt,
* the available version is different than the current version of Citus. In this case, * the available version is different than the current version of Citus. In this case,
* ALTER EXTENSION citus UPDATE command can actually update Citus to a new version. * ALTER EXTENSION citus UPDATE command can actually update Citus to a new version.
*/ */
bool isCreateAlterExtensionUpdateCitusStmt =
IsCreateAlterExtensionUpdateCitusStmt(parsetree);
bool isAlterExtensionUpdateCitusStmt = isCreateAlterExtensionUpdateCitusStmt && bool isAlterExtensionUpdateCitusStmt = isCreateAlterExtensionUpdateCitusStmt &&
IsA(parsetree, AlterExtensionStmt); IsA(parsetree, AlterExtensionStmt);

View File

@ -409,8 +409,9 @@ LocallyExecuteUtilityTask(const char *localTaskQueryCommand)
* It is a regular utility command we should execute it locally via * It is a regular utility command we should execute it locally via
* process utility. * process utility.
*/ */
CitusProcessUtility(localTaskRawParseTree, localTaskQueryCommand, ProcessUtilityParseTree(localTaskRawParseTree, localTaskQueryCommand,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver,
NULL);
} }
} }
} }

View File

@ -41,8 +41,8 @@ alter_role_if_exists(PG_FUNCTION_ARGS)
Node *parseTree = ParseTreeNode(utilityQuery); Node *parseTree = ParseTreeNode(utilityQuery);
CitusProcessUtility(parseTree, utilityQuery, PROCESS_UTILITY_TOPLEVEL, NULL, ProcessUtilityParseTree(parseTree, utilityQuery, PROCESS_UTILITY_TOPLEVEL, NULL,
None_Receiver, NULL); None_Receiver, NULL);
PG_RETURN_BOOL(true); PG_RETURN_BOOL(true);
} }
@ -96,11 +96,11 @@ worker_create_or_alter_role(PG_FUNCTION_ARGS)
quote_literal_cstr(createRoleUtilityQuery)))); quote_literal_cstr(createRoleUtilityQuery))));
} }
CitusProcessUtility(parseTree, ProcessUtilityParseTree(parseTree,
createRoleUtilityQuery, createRoleUtilityQuery,
PROCESS_UTILITY_TOPLEVEL, PROCESS_UTILITY_TOPLEVEL,
NULL, NULL,
None_Receiver, NULL); None_Receiver, NULL);
PG_RETURN_BOOL(true); PG_RETURN_BOOL(true);
} }
@ -124,11 +124,11 @@ worker_create_or_alter_role(PG_FUNCTION_ARGS)
quote_literal_cstr(alterRoleUtilityQuery)))); quote_literal_cstr(alterRoleUtilityQuery))));
} }
CitusProcessUtility(parseTree, ProcessUtilityParseTree(parseTree,
alterRoleUtilityQuery, alterRoleUtilityQuery,
PROCESS_UTILITY_TOPLEVEL, PROCESS_UTILITY_TOPLEVEL,
NULL, NULL,
None_Receiver, NULL); None_Receiver, NULL);
PG_RETURN_BOOL(true); PG_RETURN_BOOL(true);
} }

View File

@ -110,14 +110,14 @@ worker_create_or_replace_object(PG_FUNCTION_ARGS)
RenameStmt *renameStmt = CreateRenameStatement(&address, newName); RenameStmt *renameStmt = CreateRenameStatement(&address, newName);
const char *sqlRenameStmt = DeparseTreeNode((Node *) renameStmt); const char *sqlRenameStmt = DeparseTreeNode((Node *) renameStmt);
CitusProcessUtility((Node *) renameStmt, sqlRenameStmt, ProcessUtilityParseTree((Node *) renameStmt, sqlRenameStmt,
PROCESS_UTILITY_TOPLEVEL, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL); NULL, None_Receiver, NULL);
} }
/* apply create statement locally */ /* apply create statement locally */
CitusProcessUtility(parseTree, sqlStatement, PROCESS_UTILITY_TOPLEVEL, NULL, ProcessUtilityParseTree(parseTree, sqlStatement, PROCESS_UTILITY_TOPLEVEL, NULL,
None_Receiver, NULL); None_Receiver, NULL);
/* type has been created */ /* type has been created */
PG_RETURN_BOOL(true); PG_RETURN_BOOL(true);

View File

@ -396,8 +396,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
/* extend names in ddl command and apply extended command */ /* extend names in ddl command and apply extended command */
RelayEventExtendNames(ddlCommandNode, schemaName, shardId); RelayEventExtendNames(ddlCommandNode, schemaName, shardId);
CitusProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL, ProcessUtilityParseTree(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL,
None_Receiver, NULL); None_Receiver, NULL);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -428,8 +428,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId, RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId,
leftShardSchemaName, rightShardId, leftShardSchemaName, rightShardId,
rightShardSchemaName); rightShardSchemaName);
CitusProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL, ProcessUtilityParseTree(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL,
None_Receiver, NULL); None_Receiver, NULL);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -461,8 +461,8 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
} }
/* run the CREATE SEQUENCE command */ /* run the CREATE SEQUENCE command */
CitusProcessUtility(commandNode, commandString, PROCESS_UTILITY_TOPLEVEL, NULL, ProcessUtilityParseTree(commandNode, commandString, PROCESS_UTILITY_TOPLEVEL, NULL,
None_Receiver, NULL); None_Receiver, NULL);
CommandCounterIncrement(); CommandCounterIncrement();
CreateSeqStmt *createSequenceStatement = (CreateSeqStmt *) commandNode; CreateSeqStmt *createSequenceStatement = (CreateSeqStmt *) commandNode;
@ -668,8 +668,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
CitusProcessUtility((Node *) localCopyCommand, queryString->data, ProcessUtilityParseTree((Node *) localCopyCommand, queryString->data,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
SetUserIdAndSecContext(savedUserId, savedSecurityContext); SetUserIdAndSecContext(savedUserId, savedSecurityContext);
@ -781,8 +781,8 @@ AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName,
SetDefElemArg(alterSequenceStatement, "restart", startFloatArg); SetDefElemArg(alterSequenceStatement, "restart", startFloatArg);
/* since the command is an AlterSeqStmt, a dummy command string works fine */ /* since the command is an AlterSeqStmt, a dummy command string works fine */
CitusProcessUtility((Node *) alterSequenceStatement, dummyString, ProcessUtilityParseTree((Node *) alterSequenceStatement, dummyString,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
} }
} }

View File

@ -65,11 +65,12 @@ extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
struct QueryEnvironment *queryEnv, DestReceiver *dest, struct QueryEnvironment *queryEnv, DestReceiver *dest,
QueryCompletionCompat *completionTag QueryCompletionCompat *completionTag
); );
extern void CitusProcessUtility(Node *node, const char *queryString, extern void ProcessUtilityParseTree(Node *node, const char *queryString,
ProcessUtilityContext context, ParamListInfo params, ProcessUtilityContext context, ParamListInfo
DestReceiver *dest, params,
QueryCompletionCompat *completionTag DestReceiver *dest,
); QueryCompletionCompat *completionTag
);
extern void MarkInvalidateForeignKeyGraph(void); extern void MarkInvalidateForeignKeyGraph(void);
extern void InvalidateForeignKeyGraphForDDL(void); extern void InvalidateForeignKeyGraphForDDL(void);
extern List * DDLTaskList(Oid relationId, const char *commandString); extern List * DDLTaskList(Oid relationId, const char *commandString);