From 8df58926c52dc3c51b44c061cde714f8d7958ff7 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 20 Jan 2021 13:47:42 +0300 Subject: [PATCH 1/2] Rename CitusProcessUtility -> ProcessUtilityForNode --- .../distributed/commands/alter_table.c | 9 +++---- ..._table_operation_for_connected_relations.c | 8 +++---- .../distributed/commands/utility_hook.c | 11 +++++---- .../distributed/executor/local_executor.c | 5 ++-- src/backend/distributed/utils/role.c | 24 +++++++++---------- .../worker/worker_create_or_replace.c | 10 ++++---- .../worker/worker_data_fetch_protocol.c | 20 ++++++++-------- .../distributed/commands/utility_hook.h | 11 +++++---- 8 files changed, 51 insertions(+), 47 deletions(-) diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index fef1f41da..f0efe3c00 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -629,8 +629,8 @@ ConvertTable(TableConversionState *con) Node *parseTree = ParseTreeNode(tableCreationSql); RelayEventExtendNames(parseTree, con->schemaName, con->hashOfName); - CitusProcessUtility(parseTree, tableCreationSql, PROCESS_UTILITY_TOPLEVEL, - NULL, None_Receiver, NULL); + ProcessUtilityParseTree(parseTree, tableCreationSql, PROCESS_UTILITY_TOPLEVEL, + NULL, None_Receiver, NULL); } /* set columnar options */ @@ -682,8 +682,9 @@ ConvertTable(TableConversionState *con) { Node *parseTree = ParseTreeNode(attachPartitionCommand); - CitusProcessUtility(parseTree, attachPartitionCommand, PROCESS_UTILITY_TOPLEVEL, - NULL, None_Receiver, NULL); + ProcessUtilityParseTree(parseTree, attachPartitionCommand, + PROCESS_UTILITY_TOPLEVEL, + NULL, None_Receiver, NULL); } if (isPartitionTable) diff --git a/src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c b/src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c index 1afc4598d..5c91970b9 100644 --- a/src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c +++ b/src/backend/distributed/commands/cascade_table_operation_for_connected_relations.c @@ -426,8 +426,8 @@ ExecuteAndLogDDLCommand(const char *commandString) ereport(DEBUG4, (errmsg("executing \"%s\"", commandString))); Node *parseTree = ParseTreeNode(commandString); - CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL, - NULL, None_Receiver, NULL); + ProcessUtilityParseTree(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL, + NULL, None_Receiver, NULL); } @@ -478,6 +478,6 @@ ExecuteForeignKeyCreateCommand(const char *commandString, bool skip_validation) "command \"%s\"", commandString))); } - CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL, - NULL, None_Receiver, NULL); + ProcessUtilityParseTree(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL, + NULL, None_Receiver, NULL); } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index fd2527c73..2c4aab9c4 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -84,13 +84,14 @@ static bool IsDropSchemaOrDB(Node *parsetree); /* - * CitusProcessUtility is a convenience method to create a PlannedStmt out of pieces of a - * utility statement before invoking ProcessUtility. + * ProcessUtilityForParseTree is a convenience method to create a PlannedStmt out of + * pieces of a utility statement before invoking ProcessUtility. */ void -CitusProcessUtility(Node *node, const char *queryString, ProcessUtilityContext context, - ParamListInfo params, DestReceiver *dest, - QueryCompletionCompat *completionTag) +ProcessUtilityParseTree(Node *node, const char *queryString, ProcessUtilityContext + context, + ParamListInfo params, DestReceiver *dest, + QueryCompletionCompat *completionTag) { PlannedStmt *plannedStmt = makeNode(PlannedStmt); plannedStmt->commandType = CMD_UTILITY; diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 767dc7f27..ca12d8c4e 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -409,8 +409,9 @@ LocallyExecuteUtilityTask(const char *localTaskQueryCommand) * It is a regular utility command we should execute it locally via * process utility. */ - CitusProcessUtility(localTaskRawParseTree, localTaskQueryCommand, - PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + ProcessUtilityParseTree(localTaskRawParseTree, localTaskQueryCommand, + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, + NULL); } } } diff --git a/src/backend/distributed/utils/role.c b/src/backend/distributed/utils/role.c index 365b6eaa4..21ea68d5a 100644 --- a/src/backend/distributed/utils/role.c +++ b/src/backend/distributed/utils/role.c @@ -41,8 +41,8 @@ alter_role_if_exists(PG_FUNCTION_ARGS) Node *parseTree = ParseTreeNode(utilityQuery); - CitusProcessUtility(parseTree, utilityQuery, PROCESS_UTILITY_TOPLEVEL, NULL, - None_Receiver, NULL); + ProcessUtilityParseTree(parseTree, utilityQuery, PROCESS_UTILITY_TOPLEVEL, NULL, + None_Receiver, NULL); PG_RETURN_BOOL(true); } @@ -96,11 +96,11 @@ worker_create_or_alter_role(PG_FUNCTION_ARGS) quote_literal_cstr(createRoleUtilityQuery)))); } - CitusProcessUtility(parseTree, - createRoleUtilityQuery, - PROCESS_UTILITY_TOPLEVEL, - NULL, - None_Receiver, NULL); + ProcessUtilityParseTree(parseTree, + createRoleUtilityQuery, + PROCESS_UTILITY_TOPLEVEL, + NULL, + None_Receiver, NULL); PG_RETURN_BOOL(true); } @@ -124,11 +124,11 @@ worker_create_or_alter_role(PG_FUNCTION_ARGS) quote_literal_cstr(alterRoleUtilityQuery)))); } - CitusProcessUtility(parseTree, - alterRoleUtilityQuery, - PROCESS_UTILITY_TOPLEVEL, - NULL, - None_Receiver, NULL); + ProcessUtilityParseTree(parseTree, + alterRoleUtilityQuery, + PROCESS_UTILITY_TOPLEVEL, + NULL, + None_Receiver, NULL); PG_RETURN_BOOL(true); } diff --git a/src/backend/distributed/worker/worker_create_or_replace.c b/src/backend/distributed/worker/worker_create_or_replace.c index da8b2f93c..9c875e827 100644 --- a/src/backend/distributed/worker/worker_create_or_replace.c +++ b/src/backend/distributed/worker/worker_create_or_replace.c @@ -110,14 +110,14 @@ worker_create_or_replace_object(PG_FUNCTION_ARGS) RenameStmt *renameStmt = CreateRenameStatement(&address, newName); const char *sqlRenameStmt = DeparseTreeNode((Node *) renameStmt); - CitusProcessUtility((Node *) renameStmt, sqlRenameStmt, - PROCESS_UTILITY_TOPLEVEL, - NULL, None_Receiver, NULL); + ProcessUtilityParseTree((Node *) renameStmt, sqlRenameStmt, + PROCESS_UTILITY_TOPLEVEL, + NULL, None_Receiver, NULL); } /* apply create statement locally */ - CitusProcessUtility(parseTree, sqlStatement, PROCESS_UTILITY_TOPLEVEL, NULL, - None_Receiver, NULL); + ProcessUtilityParseTree(parseTree, sqlStatement, PROCESS_UTILITY_TOPLEVEL, NULL, + None_Receiver, NULL); /* type has been created */ PG_RETURN_BOOL(true); diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 9c9982a6f..a56015a3e 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -396,8 +396,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) /* extend names in ddl command and apply extended command */ RelayEventExtendNames(ddlCommandNode, schemaName, shardId); - CitusProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL, - None_Receiver, NULL); + ProcessUtilityParseTree(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL, + None_Receiver, NULL); PG_RETURN_VOID(); } @@ -428,8 +428,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId, leftShardSchemaName, rightShardId, rightShardSchemaName); - CitusProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL, - None_Receiver, NULL); + ProcessUtilityParseTree(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL, + None_Receiver, NULL); PG_RETURN_VOID(); } @@ -461,8 +461,8 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS) } /* run the CREATE SEQUENCE command */ - CitusProcessUtility(commandNode, commandString, PROCESS_UTILITY_TOPLEVEL, NULL, - None_Receiver, NULL); + ProcessUtilityParseTree(commandNode, commandString, PROCESS_UTILITY_TOPLEVEL, NULL, + None_Receiver, NULL); CommandCounterIncrement(); CreateSeqStmt *createSequenceStatement = (CreateSeqStmt *) commandNode; @@ -668,8 +668,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - CitusProcessUtility((Node *) localCopyCommand, queryString->data, - PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + ProcessUtilityParseTree((Node *) localCopyCommand, queryString->data, + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); SetUserIdAndSecContext(savedUserId, savedSecurityContext); @@ -781,8 +781,8 @@ AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName, SetDefElemArg(alterSequenceStatement, "restart", startFloatArg); /* since the command is an AlterSeqStmt, a dummy command string works fine */ - CitusProcessUtility((Node *) alterSequenceStatement, dummyString, - PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + ProcessUtilityParseTree((Node *) alterSequenceStatement, dummyString, + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); } } diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index dc3cb3d71..e8dd59801 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -65,11 +65,12 @@ extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString, struct QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletionCompat *completionTag ); -extern void CitusProcessUtility(Node *node, const char *queryString, - ProcessUtilityContext context, ParamListInfo params, - DestReceiver *dest, - QueryCompletionCompat *completionTag - ); +extern void ProcessUtilityParseTree(Node *node, const char *queryString, + ProcessUtilityContext context, ParamListInfo + params, + DestReceiver *dest, + QueryCompletionCompat *completionTag + ); extern void MarkInvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraphForDDL(void); extern List * DDLTaskList(Oid relationId, const char *commandString); From 8129ce472f6c8af856c9efad9cb63d7fbc3e34cf Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 20 Jan 2021 13:50:42 +0300 Subject: [PATCH 2/2] Refactor Utility Hook We want to be able to find the "top-level" DDL commands (not internal/cascading ones). To achieve that, we have some refactoring. --- .../distributed/commands/utility_hook.c | 167 +++++++++++------- 1 file changed, 99 insertions(+), 68 deletions(-) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 2c4aab9c4..c0bf7aafc 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -75,6 +75,13 @@ static int activeDropSchemaOrDBs = 0; /* Local functions forward declarations for helper functions */ +static void ProcessUtilityInternal(PlannedStmt *pstmt, + const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, + struct QueryEnvironment *queryEnv, + DestReceiver *dest, + QueryCompletionCompat *completionTag); static char * SetSearchPathToCurrentSearchPathCommand(void); static char * CurrentSearchPath(void); static void IncrementUtilityHookCountersIfNecessary(Node *parsetree); @@ -121,7 +128,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, QueryCompletionCompat *completionTag) { Node *parsetree = pstmt->utilityStmt; - List *ddlJobs = NIL; if (IsA(parsetree, TransactionStmt) || IsA(parsetree, LockStmt) || @@ -165,6 +171,96 @@ multi_ProcessUtility(PlannedStmt *pstmt, return; } + else if (IsA(parsetree, CallStmt)) + { + CallStmt *callStmt = (CallStmt *) parsetree; + + /* + * If the procedure is distributed and we are using MX then we have the + * possibility of calling it on the worker. If the data is located on + * the worker this can avoid making many network round trips. + */ + if (context == PROCESS_UTILITY_TOPLEVEL && + CallDistributedProcedureRemotely(callStmt, dest)) + { + return; + } + + /* + * Stored procedures are a bit strange in the sense that some statements + * are not in a transaction block, but can be rolled back. We need to + * make sure we send all statements in a transaction block. The + * StoredProcedureLevel variable signals this to the router executor + * and indicates how deep in the call stack we are in case of nested + * stored procedures. + */ + StoredProcedureLevel += 1; + + PG_TRY(); + { + standard_ProcessUtility(pstmt, queryString, context, + params, queryEnv, dest, completionTag); + + StoredProcedureLevel -= 1; + } + PG_CATCH(); + { + StoredProcedureLevel -= 1; + PG_RE_THROW(); + } + PG_END_TRY(); + + return; + } + else if (IsA(parsetree, DoStmt)) + { + /* + * All statements in a DO block are executed in a single transaciton, + * so we need to keep track of whether we are inside a DO block. + */ + DoBlockLevel += 1; + + PG_TRY(); + { + standard_ProcessUtility(pstmt, queryString, context, + params, queryEnv, dest, completionTag); + + DoBlockLevel -= 1; + } + PG_CATCH(); + { + DoBlockLevel -= 1; + PG_RE_THROW(); + } + PG_END_TRY(); + + return; + } + + ProcessUtilityInternal(pstmt, queryString, context, + params, queryEnv, dest, completionTag); +} + + +/* + * ProcessUtilityInternal is a helper function for multi_ProcessUtility where majority + * of the Citus specific utility statements are handled here. The distinction between + * both functions is that Citus_ProcessUtility does not handle CALL and DO statements. + * The reason for the distinction is implemented to be able to find the "top-level" DDL + * commands (not internal/cascading ones). UtilityHookLevel variable is used to achieve + * this goal. + */ +static void +ProcessUtilityInternal(PlannedStmt *pstmt, + const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, + struct QueryEnvironment *queryEnv, + DestReceiver *dest, + QueryCompletionCompat *completionTag) +{ + Node *parsetree = pstmt->utilityStmt; + List *ddlJobs = NIL; if (IsA(parsetree, ExplainStmt) && IsA(((ExplainStmt *) parsetree)->query, Query)) @@ -214,73 +310,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, parsetree = ProcessCreateSubscriptionStmt(createSubStmt); } - if (IsA(parsetree, CallStmt)) - { - CallStmt *callStmt = (CallStmt *) parsetree; - - /* - * If the procedure is distributed and we are using MX then we have the - * possibility of calling it on the worker. If the data is located on - * the worker this can avoid making many network round trips. - */ - if (context == PROCESS_UTILITY_TOPLEVEL && - CallDistributedProcedureRemotely(callStmt, dest)) - { - return; - } - - /* - * Stored procedures are a bit strange in the sense that some statements - * are not in a transaction block, but can be rolled back. We need to - * make sure we send all statements in a transaction block. The - * StoredProcedureLevel variable signals this to the router executor - * and indicates how deep in the call stack we are in case of nested - * stored procedures. - */ - StoredProcedureLevel += 1; - - PG_TRY(); - { - standard_ProcessUtility(pstmt, queryString, context, - params, queryEnv, dest, completionTag); - - StoredProcedureLevel -= 1; - } - PG_CATCH(); - { - StoredProcedureLevel -= 1; - PG_RE_THROW(); - } - PG_END_TRY(); - - return; - } - - if (IsA(parsetree, DoStmt)) - { - /* - * All statements in a DO block are executed in a single transaciton, - * so we need to keep track of whether we are inside a DO block. - */ - DoBlockLevel += 1; - - PG_TRY(); - { - standard_ProcessUtility(pstmt, queryString, context, - params, queryEnv, dest, completionTag); - - DoBlockLevel -= 1; - } - PG_CATCH(); - { - DoBlockLevel -= 1; - PG_RE_THROW(); - } - PG_END_TRY(); - - return; - } - /* process SET LOCAL stmts of allowed GUCs in multi-stmt xacts */ if (IsA(parsetree, VariableSetStmt)) { @@ -474,6 +503,8 @@ multi_ProcessUtility(PlannedStmt *pstmt, * the available version is different than the current version of Citus. In this case, * ALTER EXTENSION citus UPDATE command can actually update Citus to a new version. */ + bool isCreateAlterExtensionUpdateCitusStmt = + IsCreateAlterExtensionUpdateCitusStmt(parsetree); bool isAlterExtensionUpdateCitusStmt = isCreateAlterExtensionUpdateCitusStmt && IsA(parsetree, AlterExtensionStmt);