From ed66517e953239302a4033861bb83e52dafa8193 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 13 Feb 2020 15:39:45 +0300 Subject: [PATCH] reimplement ExecuteUtilityTaskListWithoutResults for local execution and refactor its usages --- .../distributed/commands/utility_hook.c | 13 ++++-- src/backend/distributed/commands/vacuum.c | 6 ++- .../distributed/executor/adaptive_executor.c | 45 +++++++++++++++++-- src/include/distributed/multi_executor.h | 3 +- 4 files changed, 56 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 7ed7a1928..10faf3db8 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -715,8 +715,10 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) SendCommandToWorkersWithMetadata((char *) ddlJob->commandString); } - /* use adaptive executor when enabled */ - ExecuteUtilityTaskListWithoutResults(ddlJob->taskList); + /* local execution is not implemented for this code path */ + bool tryLocalExecution = false; + + ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, localExecutionSupported); } else { @@ -727,8 +729,11 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) PG_TRY(); { - /* use adaptive executor when enabled */ - ExecuteUtilityTaskListWithoutResults(ddlJob->taskList); + /* local execution is not implemented for this code path */ + bool tryLocalExecution = false; + + ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, + localExecutionSupported); if (shouldSyncMetadata) { diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index 62bc45537..ec3b9c77d 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -108,8 +108,10 @@ PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) List *vacuumColumnList = VacuumColumnList(vacuumStmt, relationIndex); List *taskList = VacuumTaskList(relationId, vacuumParams, vacuumColumnList); - /* use adaptive executor when enabled */ - ExecuteUtilityTaskListWithoutResults(taskList); + /* local execution is not implemented for VACUUM commands */ + bool localExecutionSupported = false; + + ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported); executedVacuumCount++; } relationIndex++; diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index a7dc7c67f..3b9f7d531 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -814,13 +814,50 @@ AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution) /* * ExecuteUtilityTaskListWithoutResults is a wrapper around executing task - * list for utility commands. It simply calls in adaptive executor's task - * execution function. + * list for utility commands. For remote tasks, it simply calls in adaptive + * executor's task execution function. For local tasks (if any), kicks Process + * Utility via CitusProcessUtility for utility commands. As some local utility + * commands can trigger udf calls, this function also processes those udf calls + * locally. */ void -ExecuteUtilityTaskListWithoutResults(List *taskList) +ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupported) { - ExecuteTaskList(ROW_MODIFY_NONE, taskList, MaxAdaptiveExecutorPoolSize); + RowModifyLevel rowModifyLevel = ROW_MODIFY_NONE; + + List *localTaskList = NIL; + List *remoteTaskList = NIL; + + /* + * Divide tasks into two if localExecutionSupported is set to true and execute + * the local tasks + */ + if (localExecutionSupported && ShouldExecuteTasksLocally(taskList)) + { + /* + * Either we are executing a utility command or a UDF call triggered + * by such a command, it has to be a modifying one + */ + bool readOnlyPlan = false; + + /* set local (if any) & remote tasks */ + ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, + &remoteTaskList); + + /* execute local tasks */ + ExecuteLocalUtilityTaskList(localTaskList); + } + else + { + /* all tasks should be executed via remote connections */ + remoteTaskList = taskList; + } + + /* execute remote tasks if any */ + if (list_length(remoteTaskList) > 0) + { + ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize); + } } diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index d1167b243..7ccbfd3e8 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -84,7 +84,8 @@ extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskL TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning); -extern void ExecuteUtilityTaskListWithoutResults(List *taskList); +extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool + localExecutionSupported); extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize); extern bool IsCitusCustomState(PlanState *planState);