From 22c903b151fa88aeda1532b847fe1afa8a4920c8 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 7 May 2020 13:30:50 +0300 Subject: [PATCH] remove ExecuteUtilityTaskListWithoutResults (#3696) This PR removes ExecuteUtilityTaskListWithoutResults and uses the same path for local execution via ExecuteTaskListExtended. ExecuteUtilityTaskList is added. ExecuteLocalTaskListExtended now has a parameter for utility commands so that it can call the right method. In order not to change the existing calls, ExecuteTaskListExtendedInternal is added, which is the main method that runs the execution, via local and remote execution. --- .../distributed/commands/utility_hook.c | 14 +--- src/backend/distributed/commands/vacuum.c | 4 +- .../distributed/executor/adaptive_executor.c | 77 ++++++------------- .../distributed/executor/local_executor.c | 69 ++++++++--------- .../distributed/master/master_truncate.c | 4 +- src/include/distributed/adaptive_executor.h | 1 + src/include/distributed/local_executor.h | 5 +- src/include/distributed/multi_executor.h | 6 +- 8 files changed, 72 insertions(+), 108 deletions(-) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 710654bf4..fbed95a5f 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -37,6 +37,7 @@ #include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/tablecmds.h" +#include "distributed/adaptive_executor.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" #include "distributed/commands/multi_copy.h" @@ -680,14 +681,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) EnsurePartitionTableNotReplicated(targetRelationId); } - /* - * If it is a local placement of a distributed table or a reference table, - * then execute the DDL command locally. - * Here we set localExecutionSupported to true regardless of whether the - * DDL command is run for/on a distributed table as - * ExecuteUtilityTaskListWithoutResults would already identify those - * DDL tasks not accessing any of the local placements. - */ bool localExecutionSupported = true; if (!ddlJob->concurrentIndexCmd) @@ -710,7 +703,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) SendCommandToWorkersWithMetadata((char *) ddlJob->commandString); } - ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, localExecutionSupported); + ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported); } else { @@ -721,8 +714,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) PG_TRY(); { - ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, - localExecutionSupported); + ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported); if (shouldSyncMetadata) { diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index ddec8c1a3..db32e23fe 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -16,6 +16,7 @@ #include "commands/defrem.h" #endif #include "commands/vacuum.h" +#include "distributed/adaptive_executor.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" #include "distributed/deparse_shard_query.h" @@ -112,8 +113,7 @@ PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) /* local execution is not implemented for VACUUM commands */ bool localExecutionSupported = false; - - ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported); + ExecuteUtilityTaskList(taskList, localExecutionSupported); executedVacuumCount++; } relationIndex++; diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 6779ff764..e03a0330b 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -628,7 +628,6 @@ static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events eventCount, bool *cancellationReceived); static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); - /* * AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor * run. Given that the result of our subplans would be evaluated before the first call to @@ -787,10 +786,12 @@ static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) { EState *estate = ScanStateGetExecutorState(scanState); + bool isUtilityCommand = false; uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList, estate->es_param_list_info, scanState->distributedPlan, - scanState->tuplestorestate); + scanState->tuplestorestate, + isUtilityCommand); /* * We're deliberately not setting execution->rowsProcessed here. The main reason @@ -819,57 +820,21 @@ AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution) /* - * ExecuteUtilityTaskListWithoutResults is a wrapper around executing task - * list for utility commands. For remote tasks, it simply calls in adaptive - * executor's task execution function. For local tasks (if any), kicks Process - * Utility via CitusProcessUtility for utility commands. As some local utility - * commands can trigger udf calls, this function also processes those udf calls - * locally. + * ExecuteUtilityTaskList is a wrapper around executing task + * list for utility commands. */ -void -ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupported) +uint64 +ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported) { - RowModifyLevel rowModifyLevel = ROW_MODIFY_NONE; + RowModifyLevel modLevel = ROW_MODIFY_NONE; + ExecutionParams *executionParams = CreateBasicExecutionParams( + modLevel, utilityTaskList, MaxAdaptiveExecutorPoolSize, localExecutionSupported + ); + executionParams->xactProperties = + DecideTransactionPropertiesForTaskList(modLevel, utilityTaskList, false); + executionParams->isUtilityCommand = true; - List *localTaskList = NIL; - List *remoteTaskList = NIL; - - /* - * Divide tasks into two if localExecutionSupported is set to true and execute - * the local tasks - */ - if (localExecutionSupported && ShouldExecuteTasksLocally(taskList)) - { - /* - * Either we are executing a utility command or a UDF call triggered - * by such a command, it has to be a modifying one - */ - bool readOnlyPlan = false; - - /* set local (if any) & remote tasks */ - ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, - &remoteTaskList); - - /* execute local tasks */ - ExecuteLocalUtilityTaskList(localTaskList); - } - else - { - /* all tasks should be executed via remote connections */ - remoteTaskList = taskList; - } - - /* execute remote tasks if any */ - if (list_length(remoteTaskList) > 0) - { - /* - * We already executed tasks locally. We should ideally remove this method and - * let ExecuteTaskListExtended handle the local execution. - */ - localExecutionSupported = false; - ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize, - localExecutionSupported); - } + return ExecuteTaskListExtended(executionParams); } @@ -977,8 +942,15 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) ErrorIfTransactionAccessedPlacementsLocally(); } - locallyProcessedRows += ExecuteLocalTaskList(localTaskList, - executionParams->tupleStore); + if (executionParams->isUtilityCommand) + { + locallyProcessedRows += ExecuteLocalUtilityTaskList(localTaskList); + } + else + { + locallyProcessedRows += ExecuteLocalTaskList(localTaskList, + executionParams->tupleStore); + } if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { @@ -1020,6 +992,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel, executionParams->tupleStore = NULL; executionParams->tupleDescriptor = NULL; executionParams->hasReturning = false; + executionParams->isUtilityCommand = false; executionParams->jobIdList = NIL; return executionParams; diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 32771d68b..f8a8c14e1 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -147,8 +147,30 @@ ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState) } DistributedPlan *distributedPlan = NULL; ParamListInfo paramListInfo = NULL; + bool isUtilityCommand = false; return ExecuteLocalTaskListExtended(taskList, paramListInfo, distributedPlan, - tupleStoreState); + tupleStoreState, isUtilityCommand); +} + + +/* + * ExecuteLocalUtilityTaskList executes the given tasks locally. + * + * The function returns totalRowsProcessed. + */ +uint64 +ExecuteLocalUtilityTaskList(List *utilityTaskList) +{ + if (list_length(utilityTaskList) == 0) + { + return 0; + } + DistributedPlan *distributedPlan = NULL; + ParamListInfo paramListInfo = NULL; + Tuplestorestate *tupleStoreState = NULL; + bool isUtilityCommand = true; + return ExecuteLocalTaskListExtended(utilityTaskList, paramListInfo, distributedPlan, + tupleStoreState, isUtilityCommand); } @@ -163,9 +185,11 @@ ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState) * The function returns totalRowsProcessed. */ uint64 -ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, +ExecuteLocalTaskListExtended(List *taskList, + ParamListInfo orig_paramListInfo, DistributedPlan *distributedPlan, - Tuplestorestate *tupleStoreState) + Tuplestorestate *tupleStoreState, + bool isUtilityCommand) { ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); int numParams = 0; @@ -202,6 +226,11 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, } LogLocalCommand(task); + if (isUtilityCommand) + { + LocallyExecuteUtilityTask(TaskQueryStringForAllPlacements(task)); + continue; + } PlannedStmt *localPlan = GetCachedLocalPlan(task, distributedPlan); @@ -271,7 +300,6 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, localPlan = planner(shardQuery, cursorOptions, paramListInfo); } - char *shardQueryString = NULL; if (GetTaskQueryType(task) == TASK_QUERY_TEXT) { @@ -332,39 +360,6 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT } -/* - * ExecuteLocalUtilityTaskList executes a list of tasks locally. This function - * also logs local execution notice for each task and sets - * TransactionAccessedLocalPlacement to true for next set of possible queries - * & commands within the current transaction block. See the comment in function. - */ -void -ExecuteLocalUtilityTaskList(List *localTaskList) -{ - Task *localTask = NULL; - - foreach_ptr(localTask, localTaskList) - { - const char *localTaskQueryCommand = TaskQueryStringForAllPlacements(localTask); - - /* we do not expect tasks with INVALID_SHARD_ID for utility commands */ - Assert(localTask->anchorShardId != INVALID_SHARD_ID); - - Assert(TaskAccessesLocalNode(localTask)); - - /* - * We should register the access to local placement to force the local - * execution of the following commands withing the current transaction. - */ - SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); - - LogLocalCommand(localTask); - - LocallyExecuteUtilityTask(localTaskQueryCommand); - } -} - - /* * LocallyExecuteUtilityTask executes the given local task query in the current * session. diff --git a/src/backend/distributed/master/master_truncate.c b/src/backend/distributed/master/master_truncate.c index 76ad4c091..fb92d3f11 100644 --- a/src/backend/distributed/master/master_truncate.c +++ b/src/backend/distributed/master/master_truncate.c @@ -17,6 +17,7 @@ #include "commands/tablecmds.h" #include "commands/trigger.h" #include "distributed/citus_ruleutils.h" +#include "distributed/adaptive_executor.h" #include "distributed/commands/utility_hook.h" #include "distributed/deparse_shard_query.h" #include "distributed/foreign_key_relationship.h" @@ -85,8 +86,7 @@ citus_truncate_trigger(PG_FUNCTION_ARGS) * then execute TRUNCATE command locally. */ bool localExecutionSupported = true; - - ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported); + ExecuteUtilityTaskList(taskList, localExecutionSupported); } PG_RETURN_DATUM(PointerGetDatum(NULL)); diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index 3dd804e06..3fec4b8a5 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -12,6 +12,7 @@ extern int ExecutorSlowStartInterval; extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize, bool localExecutionSupported); +extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int targetPoolSize, List *jobIdList); diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index f42a7c47b..37a5696f0 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -29,11 +29,12 @@ extern enum LocalExecutionStatus CurrentLocalExecutionStatus; /* extern function declarations */ extern uint64 ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState); +extern uint64 ExecuteLocalUtilityTaskList(List *utilityTaskList); extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, DistributedPlan *distributedPlan, - Tuplestorestate *tupleStoreState); -extern void ExecuteLocalUtilityTaskList(List *localTaskList); + Tuplestorestate *tupleStoreState, + bool isUtilityCommand); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList); extern bool ShouldExecuteTasksLocally(List *taskList); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index a2755e640..9670b0a6d 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -108,6 +108,10 @@ typedef struct ExecutionParams /* localExecutionSupported is true if we can use local execution, if it is false * local execution will not be used. */ bool localExecutionSupported; + + /* isUtilityCommand is true if the current execution is for a utility + * command such as a DDL command.*/ + bool isUtilityCommand; } ExecutionParams; ExecutionParams * CreateBasicExecutionParams(RowModifyLevel modLevel, @@ -120,8 +124,6 @@ extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskL TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning); -extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool - localExecutionSupported); extern bool IsCitusCustomState(PlanState *planState); extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);