diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ae2ae34b7..e5f6620ec 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -855,6 +855,29 @@ ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported) } +/* + * ExecuteUtilityTaskListExtended is a wrapper around executing task + * list for utility commands. + */ +uint64 +ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize, + bool localExecutionSupported) +{ + RowModifyLevel modLevel = ROW_MODIFY_NONE; + ExecutionParams *executionParams = CreateBasicExecutionParams( + modLevel, utilityTaskList, poolSize, localExecutionSupported + ); + + bool excludeFromXact = false; + executionParams->xactProperties = + DecideTransactionPropertiesForTaskList(modLevel, utilityTaskList, + excludeFromXact); + executionParams->isUtilityCommand = true; + + return ExecuteTaskListExtended(executionParams); +} + + /* * ExecuteTaskListOutsideTransaction is a proxy to ExecuteTaskListExtended * with defaults for some of the arguments. diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 100998f40..802a8aac5 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -366,35 +366,39 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT static void LocallyExecuteUtilityTask(const char *localTaskQueryCommand) { - RawStmt *localTaskRawStmt = (RawStmt *) ParseTreeRawStmt(localTaskQueryCommand); + List *parseTreeList = pg_parse_query(localTaskQueryCommand); + RawStmt *localTaskRawStmt = NULL; - Node *localTaskRawParseTree = localTaskRawStmt->stmt; - - /* - * Actually, the query passed to this function would mostly be a - * utility command to be executed locally. However, some utility - * commands do trigger udf calls (e.g worker_apply_shard_ddl_command) - * to execute commands in a generic way. But as we support local - * execution of utility commands, we should also process those udf - * calls locally as well. In that case, we simply execute the query - * implying the udf call in below conditional block. - */ - if (IsA(localTaskRawParseTree, SelectStmt)) + foreach_ptr(localTaskRawStmt, parseTreeList) { - /* we have no external parameters to rewrite the UDF call RawStmt */ - Query *localUdfTaskQuery = - RewriteRawQueryStmt(localTaskRawStmt, localTaskQueryCommand, NULL, 0); + Node *localTaskRawParseTree = localTaskRawStmt->stmt; - LocallyExecuteUdfTaskQuery(localUdfTaskQuery); - } - else - { /* - * It is a regular utility command we should execute it locally via - * process utility. + * Actually, the query passed to this function would mostly be a + * utility command to be executed locally. However, some utility + * commands do trigger udf calls (e.g worker_apply_shard_ddl_command) + * to execute commands in a generic way. But as we support local + * execution of utility commands, we should also process those udf + * calls locally as well. In that case, we simply execute the query + * implying the udf call in below conditional block. */ - CitusProcessUtility(localTaskRawParseTree, localTaskQueryCommand, - PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + if (IsA(localTaskRawParseTree, SelectStmt)) + { + /* we have no external parameters to rewrite the UDF call RawStmt */ + Query *localUdfTaskQuery = + RewriteRawQueryStmt(localTaskRawStmt, localTaskQueryCommand, NULL, 0); + + LocallyExecuteUdfTaskQuery(localUdfTaskQuery); + } + else + { + /* + * It is a regular utility command we should execute it locally via + * process utility. + */ + CitusProcessUtility(localTaskRawParseTree, localTaskQueryCommand, + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + } } } diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index c6c8d57d2..d71cc00d0 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -550,7 +550,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, poolSize = MaxAdaptiveExecutorPoolSize; } bool localExecutionSupported = true; - ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize, localExecutionSupported); + ExecuteUtilityTaskListExtended(taskList, poolSize, localExecutionSupported); } diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index 3fec4b8a5..55fe0726c 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -13,6 +13,8 @@ extern int ExecutorSlowStartInterval; extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize, bool localExecutionSupported); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); +extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize, + bool localExecutionSupported); extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int targetPoolSize, List *jobIdList);