reimplement ExecuteUtilityTaskListWithoutResults for local execution and refactor its usages

improve-drop-trigger2
Onur Tirtir 2020-02-13 15:39:45 +03:00
parent afc942c6af
commit ed66517e95
4 changed files with 56 additions and 11 deletions

View File

@ -715,8 +715,10 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
SendCommandToWorkersWithMetadata((char *) ddlJob->commandString);
}
/* use adaptive executor when enabled */
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList);
/* local execution is not implemented for this code path */
bool tryLocalExecution = false;
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, localExecutionSupported);
}
else
{
@ -727,8 +729,11 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
PG_TRY();
{
/* use adaptive executor when enabled */
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList);
/* local execution is not implemented for this code path */
bool tryLocalExecution = false;
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList,
localExecutionSupported);
if (shouldSyncMetadata)
{

View File

@ -108,8 +108,10 @@ PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
List *vacuumColumnList = VacuumColumnList(vacuumStmt, relationIndex);
List *taskList = VacuumTaskList(relationId, vacuumParams, vacuumColumnList);
/* use adaptive executor when enabled */
ExecuteUtilityTaskListWithoutResults(taskList);
/* local execution is not implemented for VACUUM commands */
bool localExecutionSupported = false;
ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported);
executedVacuumCount++;
}
relationIndex++;

View File

@ -814,13 +814,50 @@ AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution)
/*
* ExecuteUtilityTaskListWithoutResults is a wrapper around executing task
* list for utility commands. It simply calls in adaptive executor's task
* execution function.
* list for utility commands. For remote tasks, it simply calls in adaptive
* executor's task execution function. For local tasks (if any), kicks Process
* Utility via CitusProcessUtility for utility commands. As some local utility
* commands can trigger udf calls, this function also processes those udf calls
* locally.
*/
void
ExecuteUtilityTaskListWithoutResults(List *taskList)
ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupported)
{
ExecuteTaskList(ROW_MODIFY_NONE, taskList, MaxAdaptiveExecutorPoolSize);
RowModifyLevel rowModifyLevel = ROW_MODIFY_NONE;
List *localTaskList = NIL;
List *remoteTaskList = NIL;
/*
* Divide tasks into two if localExecutionSupported is set to true and execute
* the local tasks
*/
if (localExecutionSupported && ShouldExecuteTasksLocally(taskList))
{
/*
* Either we are executing a utility command or a UDF call triggered
* by such a command, it has to be a modifying one
*/
bool readOnlyPlan = false;
/* set local (if any) & remote tasks */
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList,
&remoteTaskList);
/* execute local tasks */
ExecuteLocalUtilityTaskList(localTaskList);
}
else
{
/* all tasks should be executed via remote connections */
remoteTaskList = taskList;
}
/* execute remote tasks if any */
if (list_length(remoteTaskList) > 0)
{
ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize);
}
}

View File

@ -84,7 +84,8 @@ extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskL
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,
bool hasReturning);
extern void ExecuteUtilityTaskListWithoutResults(List *taskList);
extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool
localExecutionSupported);
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int
targetPoolSize);
extern bool IsCitusCustomState(PlanState *planState);